[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-08-18 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755904#comment-17755904
 ] 

Haruki Okada commented on KAFKA-15046:
--

I submitted a patch [https://github.com/apache/kafka/pull/14242] .

In the meantime, I tested above patch (with porting it to 3.3.2, which is the 
version we use) in our experimental environment:
 * Setting:
 ** num.io.threads = 48
 ** incoming byte-rate: 18MB/sec
 ** Adding 300ms artificial write-delay into the device using 
[device-mapper|https://github.com/kawamuray/ddi]
 * Without patch:
 ** !image-2023-08-18-19-23-36-597.png|width=292,height=164!
 ** request-handler idle ratio is below 40%
 ** produce-response time 99.9%ile is over 1 sec
 ** We see producer-state snapshotting takes hundreds of millisecs
 *** 
{code:java}
(snip)
[2023-08-18 13:23:02,552] INFO [ProducerStateManager partition=xxx-3] Wrote 
producer snapshot at offset 3030259 with 0 producer ids in 777 ms. 
(kafka.log.ProducerStateManager)
[2023-08-18 13:23:02,852] INFO [ProducerStateManager partition=xxx-10] Wrote 
producer snapshot at offset 2991767 with 0 producer ids in 678 ms. 
(kafka.log.ProducerStateManager){code}

 * With patch:
 ** !image-2023-08-18-19-29-56-377.png|width=297,height=169!
 ** request-handler idle ratio is kept 75%
 ** produce-response time 99.9%ile is around 100ms
 ** producer-state snapshotting takes few millisecs in most cases
 *** 
{code:java}
(snip)
[2023-08-18 13:40:09,383] INFO [ProducerStateManager partition=xxx-3] Wrote 
producer snapshot at offset 6219284 with 0 producer ids in 0 ms. 
(kafka.log.ProducerStateManager) 
[2023-08-18 13:40:09,818] INFO [ProducerStateManager partition=icbm-2] Wrote 
producer snapshot at offset 6208459 with 0 producer ids in 0 ms. 
(kafka.log.ProducerStateManager){code}

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png, image-2023-08-18-19-23-36-597.png, 
> image-2023-08-18-19-29-56-377.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-08-18 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755898#comment-17755898
 ] 

Haruki Okada commented on KAFKA-15046:
--

After dug into the fsync call paths in detail, I summarized the problem and the 
solutions like below:
h2. Problem
 * While any blocking operation under holding the UnifiedLog.lock could lead to 
serious performance (even availability) issues, currently there are several 
paths that calls fsync(2) inside the lock
 ** In the meantime the lock is held, all subsequent produces against the 
partition may block
 ** This easily causes all request-handlers to be busy on bad disk performance
 ** Even worse, when a disk experiences tens of seconds of glitch (it's not 
rare in spinning drives), it makes the broker to unable to process any requests 
with unfenced from the cluster (i.e. "zombie" like status)

h2. Analysis of fsync(2) inside UnifidedLog.lock

First, fsyncs on start-up/shutdown timing isn't a problem since the broker 
isn't processing requests.

Given that, there are essentially 4 problematic call paths listed below:
h3. 1. [ProducerStateManager.takeSnapshot at 
UnifiedLog.roll|https://github.com/apache/kafka/blob/3f4816dd3eafaf1a0636d3ee689069f897c99e28/core/src/main/scala/kafka/log/UnifiedLog.scala#L2133]
 * Here, solutions is just moving fsync(2) call to the scheduler thread as part 
of existing "flush-log" job (before incrementing recovery point)

h3. 2. [ProducerStateManager.removeAndMarkSnapshotForDeletion as part of log 
segment 
deletion|https://github.com/apache/kafka/blob/3f4816dd3eafaf1a0636d3ee689069f897c99e28/core/src/main/scala/kafka/log/UnifiedLog.scala#L2133]
 * removeAndMarkSnapshotForDeletion calls Utils.atomicMoveWithFallback with 
parent-dir flushing when renaming to add .deleted suffix
 * Here, we don't need to flush parent-dir I suppose.
 * Worst case scenario, few producer snapshots which should've been deleted are 
remained with lucking .deleted after unclean shutdown
 ** In this case, these files will be eventually deleted so shouldn't be a big 
problem.

h3. 3. [LeaderEpochFileCache.truncateFromStart when incrementing 
log-start-offset|https://github.com/apache/kafka/blob/3f4816dd3eafaf1a0636d3ee689069f897c99e28/core/src/main/scala/kafka/log/UnifiedLog.scala#L986]
 * This path is called from deleteRecords on request-handler threads.
 * Here, we don't need fsync(2) either actually.
 * Upon unclean shutdown, few leader epochs might be remained in the file but 
it will be [handled by 
LogLoader|https://github.com/apache/kafka/blob/3f4816dd3eafaf1a0636d3ee689069f897c99e28/core/src/main/scala/kafka/log/LogLoader.scala#L185]
 on start-up so not a problem

h3. 4. [LeaderEpochFileCache.truncateFromEnd as part of log 
truncation|https://github.com/apache/kafka/blob/3f4816dd3eafaf1a0636d3ee689069f897c99e28/core/src/main/scala/kafka/log/UnifiedLog.scala#L1663]
 
 * Though this path is called mainly on replica fetcher threads, blocking 
replica fetchers isn't ideal either, since it could cause remote-scope produce 
performance degradation on leader side
 * Likewise, we don't need fsync(2) here since any epochs which untruncated 
will be handled on log loading procedure

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-08-16 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755351#comment-17755351
 ] 

Haruki Okada commented on KAFKA-15046:
--

[~junrao] Hi, sorry for the late response.

Thanks for your suggestion.

 

> Another way to improve this is to move the LeaderEpochFile flushing logic to 
> be part of the flushing of rolled segments

 

Yeah, that sounds make sense.

I think ProducerState snapshot also should be the unified to existing flushing 
logic then, instead of fsync-ing ProducerState separately in log.roll (i.e. 
current Kafka behavior), nor submitting to scheduler separately (i.e. like 
[ongoing patch|[https://github.com/apache/kafka/pull/13782]] ongoing patch does)

 

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> produce

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-11 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17731375#comment-17731375
 ] 

Jun Rao commented on KAFKA-15046:
-

The potential issue with moving the LeaderEpochFile flushing to an arbitrary 
background thread is that it may not be synchronized with the log flushing and 
could lead to a situation where the flushed LeaderEpochFile doesn't reflect all 
the log data before the recovery point.

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:37,319] INFO [ProducerStateManager partition=yyy-34] Wrote 
> producer snapshot at offset 247996937813 with 0 producer ids in 547 ms. 
> (kafka.log.ProducerStateMana

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-09 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17731119#comment-17731119
 ] 

Jun Rao commented on KAFKA-15046:
-

[~ocadaruma] : Thanks for identifying the problem. Another way to improve this 
is to move the LeaderEpochFile flushing logic to be part of the flushing of 
rolled segments. Currently, we flush each rolled segment in the background 
thread already. We could just make sure that we also flush the LeaderEpochFile 
there before advancing the recovery point. This way, we don't need to introduce 
a separate background task.

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:37,319] INFO [Producer

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-06 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17729618#comment-17729618
 ] 

Luke Chen commented on KAFKA-15046:
---

[~ocadaruma] , thanks for the help! 

Ping me when PR is created. Thanks.

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:37,319] INFO [ProducerStateManager partition=yyy-34] Wrote 
> producer snapshot at offset 247996937813 with 0 producer ids in 547 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:38,887] INFO [ProducerStateManager partition=zzz-9] Wrote 
> producer snapshot at offset 226222355404 with 0 producer ids in 576 ms. 
> (kafka.log.ProducerStateManager)
> ... {

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-05 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17729561#comment-17729561
 ] 

Haruki Okada commented on KAFKA-15046:
--

I see, thank you for pointing out.

Hmm, now I agree with just making fileDescriptor fsync call asynchronous should 
be fine.

(I'm still doubting if we can't move LeaderEpochFileCache's method call outside 
of Log.lock because underlying CheckpointFile is doing exclusive control by 
itself 
([https://github.com/apache/kafka/blob/3.3.2/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java#L56])
 though, carefully checking all paths which calling fsync to move it outside of 
Log.lock is too error prone and maybe hard to maintain)

 

May I assign this issue to me?

I would like to submit a patch to make LeaderEpochFile's fsync to be async. 
(for ProducerState snapshot, [https://github.com/apache/kafka/pull/13782] 
should cover already)

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs t

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-02 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728606#comment-17728606
 ] 

Luke Chen commented on KAFKA-15046:
---

Currently, the RW lock in LeaderEpochFileCache is to protect the epoch entries 
in the cache. So you can see that not every `flush` call has a write lock. Ex: 
LeaderEpochFileCache#assign, in the assign, we only lock when updating the 
cache, not lock for flush.

[https://github.com/apache/kafka/blob/3.3.2/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L59]

 

I think using the same way like [https://github.com/apache/kafka/pull/13782] 
did to let the [fileDescriptor sync 
call|https://github.com/apache/kafka/blob/3.3.2/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java#L95]
 using background thread should be fine. WDYT?

 

 

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-01 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728551#comment-17728551
 ] 

Haruki Okada commented on KAFKA-15046:
--

[~showuon]  Maybe I linked wrong file.

What I thought is to make any LeaderEpochFileCache methods (which needs 
flush()) to be called outside of Log's global lock.

LeaderEpochFileCache already does exclusive control by its RW lock so I think 
we don't need to call it inside the Log's global lock.

[https://github.com/apache/kafka/blob/3.3.2/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L44]

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:37,319] INFO [ProducerStateManager partition=yyy-34] W

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-01 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728547#comment-17728547
 ] 

Luke Chen commented on KAFKA-15046:
---

I agree that io_uring is a good way to fix that, but it needs more discussion. 
And maybe we still need a fallback solution for old OS.

> Writing to CheckpointFile is already synchronized, so can't we just move 
> checkpointing to outside of the lock?

[~ocadaruma] , I don't get it. If we don't lock the write, there might be other 
threads read partial of data, or have concurrent write. Why do you think we 
don't need the lock?

 

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:37,319] INFO [ProducerStateManager parti

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-01 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728515#comment-17728515
 ] 

Haruki Okada commented on KAFKA-15046:
--

Yeah, io_uring is promising.

However it only works with newer kernel (which some on-premises Kafka users may 
not be easy to update) and require a lot of parts of the code base.

For leader-epoch cache, the checkpointing is already done in scheduler thread 
so we should adopt solution2 I think.

Writing to CheckpointFile is already synchronized, so can't we just move 
checkpointing to outside of the lock? 
https://github.com/apache/kafka/blob/3.3.2/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java#L76

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> producer snapshot at offset 1748817854 with 8 producer ids 

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-01 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728275#comment-17728275
 ] 

Divij Vaidya commented on KAFKA-15046:
--

Yes that is right, leaderEpochCheckpoint is another I/O operation Kafka 
performs while holding the global partition lock. 

IMO, we need to move to async disk I/O using 
[io_uring|https://unixism.net/loti/what_is_io_uring.html] to prevent thread 
blocking (and lock contention) while performing disk I/O. That will be a single 
solution for all I/O operations we perform from Kafka.

Other alternative solutions are:
1. Delegate all disk flush operations to a separate thread pool in order to 
release the lock (this is the strategy that the currently open PR and current 
mechanism for disk flush follows)
2. Change the coarse grained lock over a single partition to fine grained 
locks. As an example, flushing the leading epoch should not require the same 
lock as writing to a segment since these are not dependent on each other.

[~junrao] do you have any thoughts on how to proceed ahead with fixing this?

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaR

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-01 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728266#comment-17728266
 ] 

Haruki Okada commented on KAFKA-15046:
--

Hm, when I dug into further this, I noticed there's another path that causes 
essentially same phenomenon.

 
{code:java}
"data-plane-kafka-request-handler-17" #169 daemon prio=5 os_prio=0 
cpu=50994542.49ms elapsed=595635.65s tid=0x7efdaebabe30 nid=0x1e707 
runnable  [0x7ef9a0fdf000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native Method)
        at 
sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
        at 
sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
        at org.apache.kafka.common.utils.Utils.flushDir(Utils.java:966)
        at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:951)
        at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:925)
        at 
org.apache.kafka.server.common.CheckpointFile.write(CheckpointFile.java:98)
        - locked <0x000680fc4930> (a java.lang.Object)
        at 
kafka.server.checkpoints.CheckpointFileWithFailureHandler.write(CheckpointFileWithFailureHandler.scala:37)
        at 
kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:71)
        at 
kafka.server.epoch.LeaderEpochFileCache.flush(LeaderEpochFileCache.scala:291)
        at 
kafka.server.epoch.LeaderEpochFileCache.$anonfun$truncateFromStart$3(LeaderEpochFileCache.scala:263)
        at 
kafka.server.epoch.LeaderEpochFileCache.$anonfun$truncateFromStart$3$adapted(LeaderEpochFileCache.scala:259)
        at 
kafka.server.epoch.LeaderEpochFileCache$$Lambda$571/0x00080045f040.apply(Unknown
 Source)
        at scala.Option.foreach(Option.scala:437)
        at 
kafka.server.epoch.LeaderEpochFileCache.$anonfun$truncateFromStart$1(LeaderEpochFileCache.scala:259)
        at 
kafka.server.epoch.LeaderEpochFileCache.truncateFromStart(LeaderEpochFileCache.scala:254)
        at 
kafka.log.UnifiedLog.$anonfun$maybeIncrementLogStartOffset$4(UnifiedLog.scala:1043)
        at 
kafka.log.UnifiedLog.$anonfun$maybeIncrementLogStartOffset$4$adapted(UnifiedLog.scala:1043)
        at kafka.log.UnifiedLog$$Lambda$2324/0x000800b59040.apply(Unknown 
Source)
        at scala.Option.foreach(Option.scala:437)
        at 
kafka.log.UnifiedLog.maybeIncrementLogStartOffset(UnifiedLog.scala:1043)
        - locked <0x000680fc5080> (a java.lang.Object)
        at 
kafka.cluster.Partition.$anonfun$deleteRecordsOnLeader$1(Partition.scala:1476)
        at kafka.cluster.Partition.deleteRecordsOnLeader(Partition.scala:1463)
        at 
kafka.server.ReplicaManager.$anonfun$deleteRecordsOnLocalLog$2(ReplicaManager.scala:687)
        at 
kafka.server.ReplicaManager$$Lambda$3156/0x000800d7c840.apply(Unknown 
Source)
        at 
scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
        at 
scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
        at scala.collection.mutable.HashMap.map(HashMap.scala:35)
        at 
kafka.server.ReplicaManager.deleteRecordsOnLocalLog(ReplicaManager.scala:680)
        at kafka.server.ReplicaManager.deleteRecords(ReplicaManager.scala:875)
        at 
kafka.server.KafkaApis.handleDeleteRecordsRequest(KafkaApis.scala:2216)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:196)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
        at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
 

LeaderEpoch checkpointing also calls fsync with holding Log#lock and blocking 
request-handler threads to append in the meantime.

 

This is called by scheduler thread on log-segment-breaching so might be less 
frequent than log roll though.

Does it make sense to also making LeaderEpochCheckpointFile-flush to be 
asynchronous?

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-01 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728262#comment-17728262
 ] 

Haruki Okada commented on KAFKA-15046:
--

Oh I haven't noticed there's another ticket and already the fix is available.

Thank you, I will take a look!

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Priority: Major
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:37,319] INFO [ProducerStateManager partition=yyy-34] Wrote 
> producer snapshot at offset 247996937813 with 0 producer ids in 547 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:38,887] INFO [ProducerStateManager partition=zzz-9] Wrote 
> producer snapshot at offset 226222355404 with 0 producer ids in 576 ms. 
> (kafka.log.ProducerStateManager)
> ... {code}
>  * From the analysis, we summarized the issue as 

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-01 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728260#comment-17728260
 ] 

Divij Vaidya commented on KAFKA-15046:
--

Thank you for the investigation folks. We have an active PR right now [1] which 
makes producer snapshot flush to disk asynchronously. The thread blocking 
problem due to fsync will be resolved by it.

[1] https://github.com/apache/kafka/pull/13782

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Priority: Major
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:37,319] INFO [ProducerStateManager partition=yyy-34] Wrote 
> producer snapshot at offset 247996937813 with 0 producer ids in 547 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:38,887] INFO [ProducerStateManager partition=zzz-9] Wrote 
> producer snapshot at offset 22622

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-05-31 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728214#comment-17728214
 ] 

Haruki Okada commented on KAFKA-15046:
--

If the suggestion (stop fsync-ing) makes sense, I'm happy to submit a patch.

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Priority: Major
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happens almost all the time. This caused 
> producers to delay sending messages at the incidental time.
>  ** At the incidental time, the disk I/O latency was higher than usual due to 
> the high load for replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:37,319] INFO [ProducerStateManager partition=yyy-34] Wrote 
> producer snapshot at offset 247996937813 with 0 producer ids in 547 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:38,887] INFO [ProducerStateManager partition=zzz-9] Wrote 
> producer snapshot at offset 226222355404 with 0 producer ids in 576 ms. 
> (kafka.log.ProducerStateManager)
> ... {code}
>  * From the analysis, we summarized the issue as below: