[jira] [Created] (KAFKA-16571) reassign_partitions_test.bounce_brokers should wait for messages to be sent to every partition

2024-04-16 Thread David Mao (Jira)
David Mao created KAFKA-16571:
-

 Summary: reassign_partitions_test.bounce_brokers should wait for 
messages to be sent to every partition
 Key: KAFKA-16571
 URL: https://issues.apache.org/jira/browse/KAFKA-16571
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao


This particular system test tries to bounce brokers while produce is ongoing. 
The test also has rf=3 and min.isr=3 configured, so if any brokers are bounced 
before records are produced to every partition, it is possible to run into 
OutOfOrderSequence exceptions similar to what is described in 
https://issues.apache.org/jira/browse/KAFKA-14359

When running the produce_consume_validate for the reassign_partitions_test, 
instead of waiting for 5 acked messages, we should wait for messages to be 
acked on the full set of partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16395) Producer should refresh metadata on a socket request timeout

2024-03-20 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao resolved KAFKA-16395.
---
Resolution: Not A Bug

> Producer should refresh metadata on a socket request timeout
> 
>
> Key: KAFKA-16395
> URL: https://issues.apache.org/jira/browse/KAFKA-16395
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Mao
>Assignee: David Mao
>Priority: Critical
>
> I noticed in a set of producer logs that on a broker outage, we saw the 
> following sequence of logs:
> Got error produce response with correlation id 1661616 on topic-partition 
> topic-0, retrying (2147483646 attempts left). Error: REQUEST_TIMED_OUT. Error 
> Message: Disconnected from node 0 due to timeout
> Got error produce response with correlation id 1662093 on topic-partition 
> topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER
> Received invalid metadata error in produce request on partition topic-0 due 
> to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
> intended only for the leader, this error indicates that the broker is not the 
> current leader. For requests intended for any replica, this error indicates 
> that the broker is not a replica of the topic partition.. Going to request 
> metadata update now
> this implies we did not request metadata between our produce request 
> attempts. This is a regression introduced by 
> https://issues.apache.org/jira/browse/KAFKA-14317.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16395) Producer should refresh metadata on a socket request timeout

2024-03-20 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829311#comment-17829311
 ] 

David Mao commented on KAFKA-16395:
---

on a closer reading, I believe I misunderstood the code in KAFKA-14317 and this 
is probably not a bug. closing the JIRA.

> Producer should refresh metadata on a socket request timeout
> 
>
> Key: KAFKA-16395
> URL: https://issues.apache.org/jira/browse/KAFKA-16395
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Mao
>Assignee: David Mao
>Priority: Critical
>
> I noticed in a set of producer logs that on a broker outage, we saw the 
> following sequence of logs:
> Got error produce response with correlation id 1661616 on topic-partition 
> topic-0, retrying (2147483646 attempts left). Error: REQUEST_TIMED_OUT. Error 
> Message: Disconnected from node 0 due to timeout
> Got error produce response with correlation id 1662093 on topic-partition 
> topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER
> Received invalid metadata error in produce request on partition topic-0 due 
> to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
> intended only for the leader, this error indicates that the broker is not the 
> current leader. For requests intended for any replica, this error indicates 
> that the broker is not a replica of the topic partition.. Going to request 
> metadata update now
> this implies we did not request metadata between our produce request 
> attempts. This is a regression introduced by 
> https://issues.apache.org/jira/browse/KAFKA-14317.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16395) Producer should refresh metadata on a socket request timeout

2024-03-20 Thread David Mao (Jira)
David Mao created KAFKA-16395:
-

 Summary: Producer should refresh metadata on a socket request 
timeout
 Key: KAFKA-16395
 URL: https://issues.apache.org/jira/browse/KAFKA-16395
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao
Assignee: David Mao


I noticed in a set of producer logs that on a broker outage, we saw the 
following sequence of logs:

Got error produce response with correlation id 1661616 on topic-partition 
topic-0, retrying (2147483646 attempts left). Error: REQUEST_TIMED_OUT. Error 
Message: Disconnected from node 0 due to timeout

Got error produce response with correlation id 1662093 on topic-partition 
topic-0, retrying (2147483646 attempts left). Error: NOT_LEADER_OR_FOLLOWER

Received invalid metadata error in produce request on partition topic-0 due to 
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
intended only for the leader, this error indicates that the broker is not the 
current leader. For requests intended for any replica, this error indicates 
that the broker is not a replica of the topic partition.. Going to request 
metadata update now

this implies we did not request metadata between our produce request attempts. 
This is a regression introduced by 
https://issues.apache.org/jira/browse/KAFKA-14317.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16121) Partition reassignments in ZK migration dual write mode stalled until leader epoch incremented

2024-01-12 Thread David Mao (Jira)
David Mao created KAFKA-16121:
-

 Summary: Partition reassignments in ZK migration dual write mode 
stalled until leader epoch incremented
 Key: KAFKA-16121
 URL: https://issues.apache.org/jira/browse/KAFKA-16121
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao


I noticed this in an integration test in 
https://github.com/apache/kafka/pull/15184

In ZK mode, partition leaders rely on the LeaderAndIsr request to be notified 
of new replicas as part of a reassignment. In ZK mode, we ignore any 
LeaderAndIsr request where the partition leader epoch is less than or equal to 
the current partition leader epoch.

In KRaft mode, we do not bump the leader epoch when starting a new 
reassignment, see: `triggerLeaderEpochBumpIfNeeded`. This means that the leader 
will ignore the LISR request initiating the reassignment until a leader epoch 
bump is triggered through another means, for instance preferred leader election.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16120) Partition reassignments in ZK migration dual write leaves stray partitions

2024-01-12 Thread David Mao (Jira)
David Mao created KAFKA-16120:
-

 Summary: Partition reassignments in ZK migration dual write leaves 
stray partitions
 Key: KAFKA-16120
 URL: https://issues.apache.org/jira/browse/KAFKA-16120
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao


When a reassignment is completed in ZK migration dual-write mode, the 
`StopReplica` sent by the kraft quorum migration propagator is sent with 
`delete = false` for deleted replicas when processing the topic delta. This 
results in stray replicas.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15780) Wait for consistent kraft metadata when creating topics in tests

2023-11-02 Thread David Mao (Jira)
David Mao created KAFKA-15780:
-

 Summary: Wait for consistent kraft metadata when creating topics 
in tests
 Key: KAFKA-15780
 URL: https://issues.apache.org/jira/browse/KAFKA-15780
 Project: Kafka
  Issue Type: Test
Reporter: David Mao


Tests occasionally flake when not retrying stale metadata in KRaft mode.
I suspect that the root cause is because TestUtils.createTopicWithAdmin waits 
for partitions to be present in the metadata cache but does not wait for the 
metadata to be fully published to the broker.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15526) Simplify LogAppendInfo parameters

2023-10-02 Thread David Mao (Jira)
David Mao created KAFKA-15526:
-

 Summary: Simplify LogAppendInfo parameters
 Key: KAFKA-15526
 URL: https://issues.apache.org/jira/browse/KAFKA-15526
 Project: Kafka
  Issue Type: Improvement
Reporter: David Mao
Assignee: David Mao


Currently LogAppendInfo is quite overloaded, carrying a bunch of redundant 
information. This makes some of the code unnecessarily complex in the log 
layer, since the log layer is unsure which fields are required to populate for 
higher layers, and higher layers are unsure which fields are required to bubble 
back to clients.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15344) Kafka Streams should include the message leader epoch when committing offsets

2023-08-15 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-15344:
--
Description: 
We noticed an application received an OFFSET_OUT_OF_RANGE error following a 
network partition and streams task rebalance and subsequently reset its offsets 
to the beginning.

Inspecting the logs, we saw multiple consumer log messages like: 

{code:java}
Setting offset for partition tp to the committed offset 
FetchPosition{offset=1234, offsetEpoch=Optional.empty...)
{code}

Inspecting the streams code, it looks like kafka streams calls `commitSync` 
passing through an explicit OffsetAndMetadata object but does not populate the 
offset leader epoch.

The offset leader epoch is required in the offset commit to ensure that all 
consumers in the consumer group have coherent metadata before fetching. 
Otherwise after a consumer group rebalance, a consumer may fetch with a stale 
leader epoch with respect to the committed offset and get an offset out of 
range error from a zombie partition leader.

An example of where this can cause issues:
1. We have a consumer group with consumer 1 and consumer 2. Partition P is 
assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has 
stale metadata for P.
2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 
50 without an epoch.
3. The consumer group rebalances and P is now assigned to consumer 2. Consumer 
2 has a stale leader epoch for P (let's say leader epoch 7). Consumer 2 will 
now try to fetch with leader epoch 7, offset 50. If we have a zombie leader due 
to a network partition, the zombie leader may accept consumer 2's fetch leader 
epoch and return an OFFSET_OUT_OF_RANGE to consumer 2.

If in step 1, consumer 1 committed the leader epoch for the message, then when 
consumer 2 receives assignment P it would force a metadata refresh to discover 
a sufficiently new leader epoch for the committed offset.

The low-hanging fruit fix would be to have streams pass in the message epoch 
for each commit. Another fix discussed with [~hachikuji] is to have the 
consumer cache leader epoch ranges, similar to how the broker maintains a 
leader epoch cache.


  was:
We noticed an application received an OFFSET_OUT_OF_RANGE error following a 
network partition and streams task rebalance and subsequently reset its offsets 
to the beginning.

Inspecting the logs, we saw multiple consumer log messages like: 

{code:java}
Setting offset for partition tp to the committed offset 
FetchPosition{offset=1234, offsetEpoch=Optional.empty...)
{code}

Inspecting the streams code, it looks like kafka streams calls `commitSync` 
passing through an explicit OffsetAndMetadata object but does not populate the 
offset leader epoch.

The offset leader epoch is required in the offset commit to ensure that all 
consumers in the consumer group have coherent metadata before fetching. 
Otherwise after a consumer group rebalance, a consumer may fetch with a stale 
leader epoch with respect to the committed offset and get an offset out of 
range error from a zombie partition leader.

The low-hanging fruit fix would be to have streams pass in the message epoch 
for each commit. Another fix discussed with [~hachikuji] is to have the 
consumer cache leader epoch ranges, similar to how the broker maintains a 
leader epoch cache.



> Kafka Streams should include the message leader epoch when committing offsets
> -
>
> Key: KAFKA-15344
> URL: https://issues.apache.org/jira/browse/KAFKA-15344
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: David Mao
>Priority: Major
>
> We noticed an application received an OFFSET_OUT_OF_RANGE error following a 
> network partition and streams task rebalance and subsequently reset its 
> offsets to the beginning.
> Inspecting the logs, we saw multiple consumer log messages like: 
> {code:java}
> Setting offset for partition tp to the committed offset 
> FetchPosition{offset=1234, offsetEpoch=Optional.empty...)
> {code}
> Inspecting the streams code, it looks like kafka streams calls `commitSync` 
> passing through an explicit OffsetAndMetadata object but does not populate 
> the offset leader epoch.
> The offset leader epoch is required in the offset commit to ensure that all 
> consumers in the consumer group have coherent metadata before fetching. 
> Otherwise after a consumer group rebalance, a consumer may fetch with a stale 
> leader epoch with respect to the committed offset and get an offset out of 
> range error from a zombie partition leader.
> An example of where this can cause issues:
> 1. We have a consumer group with consumer 1 and consumer 2. Partition P is 
> assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has 

[jira] [Created] (KAFKA-15344) Kafka Streams should include the message leader epoch when committing offsets

2023-08-15 Thread David Mao (Jira)
David Mao created KAFKA-15344:
-

 Summary: Kafka Streams should include the message leader epoch 
when committing offsets
 Key: KAFKA-15344
 URL: https://issues.apache.org/jira/browse/KAFKA-15344
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao


We noticed an application received an OFFSET_OUT_OF_RANGE error following a 
network partition and streams task rebalance and subsequently reset its offsets 
to the beginning.

Inspecting the logs, we saw multiple consumer log messages like: 

{code:java}
Setting offset for partition tp to the committed offset 
FetchPosition{offset=1234, offsetEpoch=Optional.empty...)
{code}

Inspecting the streams code, it looks like kafka streams calls `commitSync` 
passing through an explicit OffsetAndMetadata object but does not populate the 
offset leader epoch.

The offset leader epoch is required in the offset commit to ensure that all 
consumers in the consumer group have coherent metadata before fetching. 
Otherwise after a consumer group rebalance, a consumer may fetch with a stale 
leader epoch with respect to the committed offset and get an offset out of 
range error from a zombie partition leader.

The low-hanging fruit fix would be to have streams pass in the message epoch 
for each commit. Another fix discussed with [~hachikuji] is to have the 
consumer cache leader epoch ranges, similar to how the broker maintains a 
leader epoch cache.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15220) KRaftMetadataCache returns fenced brokers from getAliveBrokerNode

2023-07-19 Thread David Mao (Jira)
David Mao created KAFKA-15220:
-

 Summary: KRaftMetadataCache returns fenced brokers from 
getAliveBrokerNode
 Key: KAFKA-15220
 URL: https://issues.apache.org/jira/browse/KAFKA-15220
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao
Assignee: David Mao






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15192) Network thread receives exception when updating request metrics

2023-07-14 Thread David Mao (Jira)
David Mao created KAFKA-15192:
-

 Summary: Network thread receives exception when updating request 
metrics
 Key: KAFKA-15192
 URL: https://issues.apache.org/jira/browse/KAFKA-15192
 Project: Kafka
  Issue Type: Bug
  Components: metrics
Reporter: David Mao


We noticed an exception being thrown from the network threads when updating 
some of the request histograms. Example stack trace:

 
java.util.NoSuchElementException
at 
java.util.concurrent.ConcurrentSkipListMap.firstKey(ConcurrentSkipListMap.java:2064)
at 
com.yammer.metrics.stats.ExponentiallyDecayingSample.update(ExponentiallyDecayingSample.java:102)
at 
com.yammer.metrics.stats.ExponentiallyDecayingSample.update(ExponentiallyDecayingSample.java:81)
at com.yammer.metrics.core.Histogram.update(Histogram.java:110)
 

Searching the error I found a similar ticket resolved in Cassandra by updating 
their dropwizard dependency to pull in 
[https://github.com/dropwizard/metrics/pull/1436]. 
https://issues.apache.org/jira/browse/CASSANDRA-15472

Kafka currently still uses yammer metrics, so we would need to take 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-510%3A+Metrics+library+upgrade]
 forward to upgrade to a dropwizard version that fixes this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15044) Snappy v.1.1.9.1 NoClassDefFound on ARM machines

2023-05-31 Thread David Mao (Jira)
David Mao created KAFKA-15044:
-

 Summary: Snappy v.1.1.9.1 NoClassDefFound on ARM machines
 Key: KAFKA-15044
 URL: https://issues.apache.org/jira/browse/KAFKA-15044
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.5.0
Reporter: David Mao
Assignee: David Mao


We upgraded our snappy dependency but v1.1.9.1 has compatibility issues with 
arm. We should upgrade to v1.1.10.0 which resolves this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14990) Dynamic producer ID expiration is not applied on broker restart

2023-05-11 Thread David Mao (Jira)
David Mao created KAFKA-14990:
-

 Summary: Dynamic producer ID expiration is not applied on broker 
restart
 Key: KAFKA-14990
 URL: https://issues.apache.org/jira/browse/KAFKA-14990
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao
Assignee: David Mao






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14295) FetchMessageConversionsPerSec meter not recorded

2022-10-12 Thread David Mao (Jira)
David Mao created KAFKA-14295:
-

 Summary: FetchMessageConversionsPerSec meter not recorded
 Key: KAFKA-14295
 URL: https://issues.apache.org/jira/browse/KAFKA-14295
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao


The broker topic metric FetchMessageConversionsPerSec doesn't get recorded on a 
fetch message conversion.

The bug is that we pass in a callback that expects a MultiRecordsSend in 
KafkaApis:
{code:java}
def updateConversionStats(send: Send): Unit = {
  send match {
case send: MultiRecordsSend if send.recordConversionStats != null =>
  send.recordConversionStats.asScala.toMap.foreach {
case (tp, stats) => updateRecordConversionStats(request, tp, stats)
  }
case _ =>
  }
} {code}
But we call this callback with a NetworkSend in the SocketServer:
{code:java}
selector.completedSends.forEach { send =>
  try {
val response = inflightResponses.remove(send.destinationId).getOrElse {
  throw new IllegalStateException(s"Send for ${send.destinationId} 
completed, but not in `inflightResponses`")
}
updateRequestMetrics(response)

// Invoke send completion callback
response.onComplete.foreach(onComplete => onComplete(send))
...{code}
Note that Selector.completedSends returns a collection of NetworkSend



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14144) AlterPartition is not idempotent when requests time out

2022-08-04 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-14144:
--
Affects Version/s: 3.3.0

> AlterPartition is not idempotent when requests time out
> ---
>
> Key: KAFKA-14144
> URL: https://issues.apache.org/jira/browse/KAFKA-14144
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: David Mao
>Priority: Blocker
>
> [https://github.com/apache/kafka/pull/12032] changed the validation order of 
> AlterPartition requests to fence requests with a stale partition epoch before 
> we compare the leader and ISR contents.
> This results in a loss of idempotency if a leader does not receive an 
> AlterPartition response because retries will receive an 
> INVALID_UPDATE_VERSION error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14144) AlterPartition is not idempotent when requests time out

2022-08-04 Thread David Mao (Jira)
David Mao created KAFKA-14144:
-

 Summary: AlterPartition is not idempotent when requests time out
 Key: KAFKA-14144
 URL: https://issues.apache.org/jira/browse/KAFKA-14144
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao


[https://github.com/apache/kafka/pull/12032] changed the validation order of 
AlterPartition requests to fence requests with a stale partition epoch before 
we compare the leader and ISR contents.

This results in a loss of idempotency if a leader does not receive an 
AlterPartition response because retries will receive an INVALID_UPDATE_VERSION 
error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14025) Request timeout includes time queued per connection

2022-06-26 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-14025:
--
Description: 
In the NetworkClient we measure request timeouts from when a request is first 
queued in 

{{{}InflightRequests{}}}. A Kafka broker currently only processes at most one 
in-flight request at a time per connection. This disconnect between client-side 
request timeout enforcement and broker-side request processing means that an 
aggressive request timeout can result in spurious timeouts when a connection 
has multiple requests in-flight.

Example:

broker 0 has avg 25 ms latency, producer 1 has a request timeout of 50 ms.

t=0 p1 sends r1, b0 starts processing r1

t=5 p1 sends r2

t=10 p1 sends r3

t=25 b0 sends response for r1, starts processing r2

t=50 b0 sends response for r2, starts processing r3

t=60 client times out r3, despite the broker only having 10 ms or so to process 
the request.

 

Instead, we should be measuring request timeouts only once previous requests 
have been responded to.

  was:
In the NetworkClient we measure request timeouts from when a request is first 
queued in 

{{{}InflightRequests{}}}. A Kafka broker currently only processes at most one 
in-flight request at a time per connection. This disconnect between client-side 
request timeout enforcement and broker-side request processing means that an 
aggressive request timeout can result in spurious timeouts when a connection 
has multiple requests in-flight.

Example:

broker 0 has avg 25 ms latency, producer 1 has a request timeout of 50 ms.

t=0 p1 sends r1, b0 starts processing r1

t=5 p1 sends r2

t=10 p1 sends r3

t=25 b0 sends response for r1, starts processing r2

t=50 b0 sends response for r2, starts processing r3

t=60 client times out r3

 

Instead, we should be measuring request timeouts only once previous requests 
have been responded to.


> Request timeout includes time queued per connection
> ---
>
> Key: KAFKA-14025
> URL: https://issues.apache.org/jira/browse/KAFKA-14025
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Mao
>Priority: Major
>
> In the NetworkClient we measure request timeouts from when a request is first 
> queued in 
> {{{}InflightRequests{}}}. A Kafka broker currently only processes at most one 
> in-flight request at a time per connection. This disconnect between 
> client-side request timeout enforcement and broker-side request processing 
> means that an aggressive request timeout can result in spurious timeouts when 
> a connection has multiple requests in-flight.
> Example:
> broker 0 has avg 25 ms latency, producer 1 has a request timeout of 50 ms.
> t=0 p1 sends r1, b0 starts processing r1
> t=5 p1 sends r2
> t=10 p1 sends r3
> t=25 b0 sends response for r1, starts processing r2
> t=50 b0 sends response for r2, starts processing r3
> t=60 client times out r3, despite the broker only having 10 ms or so to 
> process the request.
>  
> Instead, we should be measuring request timeouts only once previous requests 
> have been responded to.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-14025) Request timeout includes time queued per connection

2022-06-26 Thread David Mao (Jira)
David Mao created KAFKA-14025:
-

 Summary: Request timeout includes time queued per connection
 Key: KAFKA-14025
 URL: https://issues.apache.org/jira/browse/KAFKA-14025
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao


In the NetworkClient we measure request timeouts from when a request is first 
queued in 

{{{}InflightRequests{}}}. A Kafka broker currently only processes at most one 
in-flight request at a time per connection. This disconnect between client-side 
request timeout enforcement and broker-side request processing means that an 
aggressive request timeout can result in spurious timeouts when a connection 
has multiple requests in-flight.

Example:

broker 0 has avg 25 ms latency, producer 1 has a request timeout of 50 ms.

t=0 p1 sends r1, b0 starts processing r1

t=5 p1 sends r2

t=10 p1 sends r3

t=25 b0 sends response for r1, starts processing r2

t=50 b0 sends response for r2, starts processing r3

t=60 client times out r3

 

Instead, we should be measuring request timeouts only once previous requests 
have been responded to.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (KAFKA-13623) Memory leak when multiple poll

2022-01-30 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17484366#comment-17484366
 ] 

David Mao edited comment on KAFKA-13623 at 1/30/22, 2:40 PM:
-

direct memory is allocated when java applications perform IO with heap byte 
buffers. Since you're not calling poll in the second example, it's expected 
that direct memory wouldn't be allocated.

It's a little weird that the direct memory allocation keeps growing, since the 
direct memory is tied to the lifetime of a thread. It may be the case that a 
full GC is needed for the direct memory to get cleaned up.

What is killing the process here, can you describe the application environment?

Likewise, what JVM args are you running?


was (Author: david.mao):
Can you add

*-XX:+HeapDumpOnOutOfMemoryError*

to the consumer app java args and upload the heap dump?

> Memory leak when multiple poll
> --
>
> Key: KAFKA-13623
> URL: https://issues.apache.org/jira/browse/KAFKA-13623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.1, 2.8.1
>Reporter: Emanuel Velzi
>Priority: Major
>
> Hi, I'm experiencing a kind of memory leak with this simple consumer.
> Some info before the code:
>     - kafka-clients.version: I try with 2.4.1 and 2.8.1
> I only set these properties:
>     - bootstrap.servers: my-servers
>     - group.id: my-group-id
>     - auto.offset.reset: earliest
>     - enable.auto.commit: false
>     - heartbeat.interval.ms: 300
> My topic has NUM_PARTITIONS=48 partitions:
> {code:java}
> public class Test {
>     /* ... */
>     public void start() {
>         for (int i = 0; i < NUM_PARTITIONS; i++) {
>             startOne();
>         }
>     }
>     public void startOne() {
>         LOGGER.info("startOne");
>         this.pool.submit(this::startConsumer;
>     }
>     public void startConsumer() {
>         var consumer = new KafkaConsumer<>(this.kafkaConfiguration, 
> this.stringDeserializer, this.stringDeserializer)
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error!");
>         } finally {
>             consumer.close();
>         }
>         scheduleNewConsumer();
>     }
>     private void scheduleNewConsumer() {
>         scheduledExecutorService.schedule(() -> startOne(), 
> Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS);
>     }
> }
> {code}
>  
> In summary, when I have some error processing a record, then I close de 
> consumer and retry, starting a new one. 
> In that moment the Direct Memory used by de java process starts to grow up 
> indefinitely, until the process is killed.
> I test some other strategies. For example:
>  - no close the consumer, and reuse it with a seek(..)
>  - no close the consumer, and reuse it doing:  consumer.unsubscribe(); and 
> consumer.subscribe(..);
> In both cases the memory leak was slower, but it happened anyway.
> Also I tried this:
> {code:java}
> public void startConsumer(Consumer consumer) {
>  /*always using the same consumer*/
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             // NO POLL HERE: consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error!");
>         } finally {
>             consumer.unsubscribe();
>             consumer.subscribe(Collections.singletonList(this.topic));
>         }
>         scheduleNewConsumer();
>     }{code}
>  
> I mean, multiple times I'm subscribing and unsubscribing the consumer, 
> without poll anything. In those cases I don't experience the memory leak. So, 
> I imagine that the problem is the poll itself.
> Someone can help me with this please?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13623) Memory leak when multiple poll

2022-01-30 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17484366#comment-17484366
 ] 

David Mao commented on KAFKA-13623:
---

Can you add

*-XX:+HeapDumpOnOutOfMemoryError*

to the consumer app java args and upload the heap dump?

> Memory leak when multiple poll
> --
>
> Key: KAFKA-13623
> URL: https://issues.apache.org/jira/browse/KAFKA-13623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.1, 2.8.1
>Reporter: Emanuel Velzi
>Priority: Major
>
> Hi, I'm experiencing a kind of memory leak with this simple consumer.
> Some info before the code:
>     - kafka-clients.version: I try with 2.4.1 and 2.8.1
> I only set these properties:
>     - bootstrap.servers: my-servers
>     - group.id: my-group-id
>     - auto.offset.reset: earliest
>     - enable.auto.commit: false
>     - heartbeat.interval.ms: 300
> My topic has NUM_PARTITIONS=48 partitions:
> {code:java}
> public class Test {
>     /* ... */
>     public void start() {
>         for (int i = 0; i < NUM_PARTITIONS; i++) {
>             startOne();
>         }
>     }
>     public void startOne() {
>         LOGGER.info("startOne");
>         this.pool.submit(this::startConsumer;
>     }
>     public void startConsumer() {
>         var consumer = new KafkaConsumer<>(this.kafkaConfiguration, 
> this.stringDeserializer, this.stringDeserializer)
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error!");
>         } finally {
>             consumer.close();
>         }
>         scheduleNewConsumer();
>     }
>     private void scheduleNewConsumer() {
>         scheduledExecutorService.schedule(() -> startOne(), 
> Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS);
>     }
> }
> {code}
>  
> In summary, when I have some error processing a record, then I close de 
> consumer and retry, starting a new one. 
> In that moment the Direct Memory used by de java process starts to grow up 
> indefinitely, until the process is killed.
> I test some other strategies. For example:
>  - no close the consumer, and reuse it with a seek(..)
>  - no close the consumer, and reuse it doing:  consumer.unsubscribe(); and 
> consumer.subscribe(..);
> In both cases the memory leak was slower, but it happened anyway.
> Also I tried this:
> {code:java}
> public void startConsumer(Consumer consumer) {
>  /*always using the same consumer*/
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             // NO POLL HERE: consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error!");
>         } finally {
>             consumer.unsubscribe();
>             consumer.subscribe(Collections.singletonList(this.topic));
>         }
>         scheduleNewConsumer();
>     }{code}
>  
> I mean, multiple times I'm subscribing and unsubscribing the consumer, 
> without poll anything. In those cases I don't experience the memory leak. So, 
> I imagine that the problem is the poll itself.
> Someone can help me with this please?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13614) Leader replication quota is applied to consumer fetches

2022-01-24 Thread David Mao (Jira)
David Mao created KAFKA-13614:
-

 Summary: Leader replication quota is applied to consumer fetches
 Key: KAFKA-13614
 URL: https://issues.apache.org/jira/browse/KAFKA-13614
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: David Mao
Assignee: David Mao


in ReplicaManager.readFromLocalLog we check shouldLeaderThrottle regardless of 
whether the read is coming from a consumer or follower broker. This results in 
replication quota being applied to consumer fetches.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2021-12-10 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17457216#comment-17457216
 ] 

David Mao edited comment on KAFKA-13388 at 12/10/21, 4:43 PM:
--

We should probably bump up the priority of this Jira to Major or Critical since 
this prevents a producer from being able to recover its connection to a node 
until it restarts, or the connection gets idle-killed. I'm not sure what the 
impact is on the consumer or admin client.


was (Author: david.mao):
We should probably bump up the priority of this Jira to Major or Critical since 
this prevents a producer from being able to recover its connection to a node 
until it restarts, or the connection gets idle-killed.

> Kafka Producer nodes stuck in CHECKING_API_VERSIONS
> ---
>
> Key: KAFKA-13388
> URL: https://issues.apache.org/jira/browse/KAFKA-13388
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Hoffman
>Priority: Critical
> Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, 
> image-2021-10-21-13-42-06-528.png
>
>
> I have been seeing expired batch errors in my app.
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for 
> xxx-17:120002 ms has passed since batch creation
> {code}
>  I would have assumed a request timout or connection timeout should have also 
> been logged. I could not find any other associated errors. 
> I added some instrumenting to my app and have traced this down to broker 
> connections hanging in CHECKING_API_VERSIONS state. -It appears there is no 
> effective timeout for Kafka Producer broker connections in 
> CHECKING_API_VERSIONS state.-
> In the code see the after the NetworkClient connects to a broker node it 
> makes a request to check api versions, when it receives the response it marks 
> the node as ready. -I am seeing that sometimes a reply is not received for 
> the check api versions request the connection just hangs in 
> CHECKING_API_VERSIONS state until it is disposed I assume after the idle 
> connection timeout.-
> Update: not actually sure what causes the connection to get stuck in 
> CHECKING_API_VERSIONS.
> -I am guessing the connection setup timeout should be still in play for this, 
> but it is not.- 
>  -There is a connectingNodes set that is consulted when checking timeouts and 
> the node is removed- 
>  -when ClusterConnectionStates.checkingApiVersions(String id) is called to 
> transition the node into CHECKING_API_VERSIONS-



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2021-12-10 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17457216#comment-17457216
 ] 

David Mao commented on KAFKA-13388:
---

We should probably bump up the priority of this Jira to Major or Critical since 
this prevents a producer from being able to recover its connection to a node 
until it restarts, or the connection gets idle-killed.

> Kafka Producer nodes stuck in CHECKING_API_VERSIONS
> ---
>
> Key: KAFKA-13388
> URL: https://issues.apache.org/jira/browse/KAFKA-13388
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Hoffman
>Priority: Minor
> Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, 
> image-2021-10-21-13-42-06-528.png
>
>
> I have been seeing expired batch errors in my app.
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for 
> xxx-17:120002 ms has passed since batch creation
> {code}
>  I would have assumed a request timout or connection timeout should have also 
> been logged. I could not find any other associated errors. 
> I added some instrumenting to my app and have traced this down to broker 
> connections hanging in CHECKING_API_VERSIONS state. -It appears there is no 
> effective timeout for Kafka Producer broker connections in 
> CHECKING_API_VERSIONS state.-
> In the code see the after the NetworkClient connects to a broker node it 
> makes a request to check api versions, when it receives the response it marks 
> the node as ready. -I am seeing that sometimes a reply is not received for 
> the check api versions request the connection just hangs in 
> CHECKING_API_VERSIONS state until it is disposed I assume after the idle 
> connection timeout.-
> Update: not actually sure what causes the connection to get stuck in 
> CHECKING_API_VERSIONS.
> -I am guessing the connection setup timeout should be still in play for this, 
> but it is not.- 
>  -There is a connectingNodes set that is consulted when checking timeouts and 
> the node is removed- 
>  -when ClusterConnectionStates.checkingApiVersions(String id) is called to 
> transition the node into CHECKING_API_VERSIONS-



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2021-12-10 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-13388:
--
Priority: Critical  (was: Minor)

> Kafka Producer nodes stuck in CHECKING_API_VERSIONS
> ---
>
> Key: KAFKA-13388
> URL: https://issues.apache.org/jira/browse/KAFKA-13388
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Hoffman
>Priority: Critical
> Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, 
> image-2021-10-21-13-42-06-528.png
>
>
> I have been seeing expired batch errors in my app.
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for 
> xxx-17:120002 ms has passed since batch creation
> {code}
>  I would have assumed a request timout or connection timeout should have also 
> been logged. I could not find any other associated errors. 
> I added some instrumenting to my app and have traced this down to broker 
> connections hanging in CHECKING_API_VERSIONS state. -It appears there is no 
> effective timeout for Kafka Producer broker connections in 
> CHECKING_API_VERSIONS state.-
> In the code see the after the NetworkClient connects to a broker node it 
> makes a request to check api versions, when it receives the response it marks 
> the node as ready. -I am seeing that sometimes a reply is not received for 
> the check api versions request the connection just hangs in 
> CHECKING_API_VERSIONS state until it is disposed I assume after the idle 
> connection timeout.-
> Update: not actually sure what causes the connection to get stuck in 
> CHECKING_API_VERSIONS.
> -I am guessing the connection setup timeout should be still in play for this, 
> but it is not.- 
>  -There is a connectingNodes set that is consulted when checking timeouts and 
> the node is removed- 
>  -when ClusterConnectionStates.checkingApiVersions(String id) is called to 
> transition the node into CHECKING_API_VERSIONS-



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2021-12-09 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17456854#comment-17456854
 ] 

David Mao edited comment on KAFKA-13388 at 12/10/21, 2:10 AM:
--

[~dhofftgt] 

Looking at where the NetworkClient enters the CHECKING_API_VERSIONS state, we 
see:

 
{code:java}
if (discoverBrokerVersions) {
 this.connectionStates.checkingApiVersions(node); 
 nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
 {code}
which is a separate queue for nodes needing to send the api versions request.

Then in 

 
{code:java}
private void handleInitiateApiVersionRequests(long now) {
Iterator> iter = 
nodesNeedingApiVersionsFetch.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = iter.next();
String node = entry.getKey();
if (selector.isChannelReady(node) && 
inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
ApiVersionsRequest.Builder apiVersionRequestBuilder = 
entry.getValue();
ClientRequest clientRequest = newClientRequest(node, 
apiVersionRequestBuilder, now, true);
doSend(clientRequest, true, now);
iter.remove();
}{code}
we only send out the api versions request if the channel is ready (TLS 
handshake complete, SASL handshake complete).

This is actually a pretty insidious bug because I think we end up in a state 
where we do not apply any request timeout to the channel if there is some delay 
in completing any of the handshaking/authentication steps, since the inflight 
requests are empty.


was (Author: david.mao):
[~dhofftgt] 

Looking at where the NetworkClient enters the CHECKING_API_VERSIONS state, we 
see:

 
{code:java}
if (discoverBrokerVersions) {
 this.connectionStates.checkingApiVersions(node); 
 nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
 {code}
which is a separate queue for nodes needing to send the api versions request.

Then in 

 
{code:java}
private void handleInitiateApiVersionRequests(long now) {
Iterator> iter = 
nodesNeedingApiVersionsFetch.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = iter.next();
String node = entry.getKey();
if (selector.isChannelReady(node) && 
inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
ApiVersionsRequest.Builder apiVersionRequestBuilder = 
entry.getValue();
ClientRequest clientRequest = newClientRequest(node, 
apiVersionRequestBuilder, now, true);
doSend(clientRequest, true, now);
iter.remove();
}{code}
we only send out the api versions request if the channel is ready (TLS 
handshake complete, SASL handshake complete).

This is actually a pretty insidious bug because I think we end up in a state 
where we do not apply any request timeout to the channel if there is some 
problem completing any of the handshaking/authentication steps, since the 
inflight requests are empty.

> Kafka Producer nodes stuck in CHECKING_API_VERSIONS
> ---
>
> Key: KAFKA-13388
> URL: https://issues.apache.org/jira/browse/KAFKA-13388
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Hoffman
>Priority: Minor
> Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, 
> image-2021-10-21-13-42-06-528.png
>
>
> I have been seeing expired batch errors in my app.
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for 
> xxx-17:120002 ms has passed since batch creation
> {code}
>  I would have assumed a request timout or connection timeout should have also 
> been logged. I could not find any other associated errors. 
> I added some instrumenting to my app and have traced this down to broker 
> connections hanging in CHECKING_API_VERSIONS state. -It appears there is no 
> effective timeout for Kafka Producer broker connections in 
> CHECKING_API_VERSIONS state.-
> In the code see the after the NetworkClient connects to a broker node it 
> makes a request to check api versions, when it receives the response it marks 
> the node as ready. -I am seeing that sometimes a reply is not received for 
> the check api versions request the connection just hangs in 
> CHECKING_API_VERSIONS state until it is disposed I assume after the idle 
> connection timeout.-
> Update: not actually sure what causes the connection to get stuck in 
> CHECKING_API_VERSIONS.
> -I am guessing the connection setup timeout should be still in play for this, 
> but it is not.- 
>  -There is a connectingNodes set that is consulted when checking timeouts and 
> the node is removed- 
>  -when 

[jira] [Comment Edited] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2021-12-09 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17456854#comment-17456854
 ] 

David Mao edited comment on KAFKA-13388 at 12/10/21, 1:56 AM:
--

[~dhofftgt] 

Looking at where the NetworkClient enters the CHECKING_API_VERSIONS state, we 
see:

 
{code:java}
if (discoverBrokerVersions) {
 this.connectionStates.checkingApiVersions(node); 
 nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
 {code}
which is a separate queue for nodes needing to send the api versions request.

Then in 

 
{code:java}
private void handleInitiateApiVersionRequests(long now) {
Iterator> iter = 
nodesNeedingApiVersionsFetch.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = iter.next();
String node = entry.getKey();
if (selector.isChannelReady(node) && 
inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
ApiVersionsRequest.Builder apiVersionRequestBuilder = 
entry.getValue();
ClientRequest clientRequest = newClientRequest(node, 
apiVersionRequestBuilder, now, true);
doSend(clientRequest, true, now);
iter.remove();
}{code}
we only send out the api versions request if the channel is ready (TLS 
handshake complete, SASL handshake complete).

This is actually a pretty insidious bug because I think we end up in a state 
where we do not apply any request timeout to the channel if there is some 
problem completing any of the handshaking/authentication steps, since the 
inflight requests are empty.


was (Author: david.mao):
[~dhofftgt] 

Looking at where the NetworkClient enters the CHECKING_API_VERSIONS state, we 
see:

 
{code:java}
if (discoverBrokerVersions) {
 this.connectionStates.checkingApiVersions(node); 
 nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
 {code}
which is a separate queue for nodes needing to send the api versions request.

Then in 

 
{code:java}
private void handleInitiateApiVersionRequests(long now) {
Iterator> iter = 
nodesNeedingApiVersionsFetch.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = iter.next();
String node = entry.getKey();
if (selector.isChannelReady(node) && 
inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
ApiVersionsRequest.Builder apiVersionRequestBuilder = 
entry.getValue();
ClientRequest clientRequest = newClientRequest(node, 
apiVersionRequestBuilder, now, true);
doSend(clientRequest, true, now);
iter.remove();
}{code}
we only send out the api versions request if the channel is ready (TLS 
handshake complete, SASL handshake complete).

This is actually a pretty insidious bug because I think we end up in a state 
where we do not apply any request timeout to the channel, since the inflight 
requests are empty.

> Kafka Producer nodes stuck in CHECKING_API_VERSIONS
> ---
>
> Key: KAFKA-13388
> URL: https://issues.apache.org/jira/browse/KAFKA-13388
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Hoffman
>Priority: Minor
> Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, 
> image-2021-10-21-13-42-06-528.png
>
>
> I have been seeing expired batch errors in my app.
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for 
> xxx-17:120002 ms has passed since batch creation
> {code}
>  I would have assumed a request timout or connection timeout should have also 
> been logged. I could not find any other associated errors. 
> I added some instrumenting to my app and have traced this down to broker 
> connections hanging in CHECKING_API_VERSIONS state. -It appears there is no 
> effective timeout for Kafka Producer broker connections in 
> CHECKING_API_VERSIONS state.-
> In the code see the after the NetworkClient connects to a broker node it 
> makes a request to check api versions, when it receives the response it marks 
> the node as ready. -I am seeing that sometimes a reply is not received for 
> the check api versions request the connection just hangs in 
> CHECKING_API_VERSIONS state until it is disposed I assume after the idle 
> connection timeout.-
> Update: not actually sure what causes the connection to get stuck in 
> CHECKING_API_VERSIONS.
> -I am guessing the connection setup timeout should be still in play for this, 
> but it is not.- 
>  -There is a connectingNodes set that is consulted when checking timeouts and 
> the node is removed- 
>  -when ClusterConnectionStates.checkingApiVersions(String id) is called to 
> transition the node into CHECKING_API_VERSIONS-




[jira] [Commented] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2021-12-09 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17456854#comment-17456854
 ] 

David Mao commented on KAFKA-13388:
---

[~dhofftgt] 

Looking at where the NetworkClient enters the CHECKING_API_VERSIONS state, we 
see:

 
{code:java}
if (discoverBrokerVersions) {
 this.connectionStates.checkingApiVersions(node); 
 nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
 {code}
which is a separate queue for nodes needing to send the api versions request.

Then in 

 
{code:java}
private void handleInitiateApiVersionRequests(long now) {
Iterator> iter = 
nodesNeedingApiVersionsFetch.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = iter.next();
String node = entry.getKey();
if (selector.isChannelReady(node) && 
inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
ApiVersionsRequest.Builder apiVersionRequestBuilder = 
entry.getValue();
ClientRequest clientRequest = newClientRequest(node, 
apiVersionRequestBuilder, now, true);
doSend(clientRequest, true, now);
iter.remove();
}{code}
we only send out the api versions request if the channel is ready (TLS 
handshake complete, SASL handshake complete).

This is actually a pretty insidious bug because I think we end up in a state 
where we do not apply any request timeout to the channel, since the inflight 
requests are empty.

> Kafka Producer nodes stuck in CHECKING_API_VERSIONS
> ---
>
> Key: KAFKA-13388
> URL: https://issues.apache.org/jira/browse/KAFKA-13388
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Hoffman
>Priority: Minor
> Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, 
> image-2021-10-21-13-42-06-528.png
>
>
> I have been seeing expired batch errors in my app.
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for 
> xxx-17:120002 ms has passed since batch creation
> {code}
>  I would have assumed a request timout or connection timeout should have also 
> been logged. I could not find any other associated errors. 
> I added some instrumenting to my app and have traced this down to broker 
> connections hanging in CHECKING_API_VERSIONS state. -It appears there is no 
> effective timeout for Kafka Producer broker connections in 
> CHECKING_API_VERSIONS state.-
> In the code see the after the NetworkClient connects to a broker node it 
> makes a request to check api versions, when it receives the response it marks 
> the node as ready. -I am seeing that sometimes a reply is not received for 
> the check api versions request the connection just hangs in 
> CHECKING_API_VERSIONS state until it is disposed I assume after the idle 
> connection timeout.-
> Update: not actually sure what causes the connection to get stuck in 
> CHECKING_API_VERSIONS.
> -I am guessing the connection setup timeout should be still in play for this, 
> but it is not.- 
>  -There is a connectingNodes set that is consulted when checking timeouts and 
> the node is removed- 
>  -when ClusterConnectionStates.checkingApiVersions(String id) is called to 
> transition the node into CHECKING_API_VERSIONS-



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2021-11-05 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17439526#comment-17439526
 ] 

David Mao edited comment on KAFKA-13388 at 11/5/21, 10:20 PM:
--

[~dhofftgt]

Why do we expect a connection in CHECKING_API_VERSIONS to have in-flight 
requests?

I would expect the opposite: 

if a connection is in CHECKING_API_VERSIONS, it will *not* be ready for 
requests (at this point, the client does not know what API versions the broker 
supports, so it can't serialize requests to the appropriate version), so it 
should not have any inflight requests.


was (Author: david.mao):
[~dhofftgt]

Why do we expect a connection in CHECKING_API_VERSIONS to have in-flight 
requests?

I would expect the opposite: 

if a connection is in CHECKING_API_VERSIONS, it will *not* be ready for 
requests (at this point, the client does not know what API versions the broker 
supports, so it can't serialize requests to the appropriate version).

> Kafka Producer nodes stuck in CHECKING_API_VERSIONS
> ---
>
> Key: KAFKA-13388
> URL: https://issues.apache.org/jira/browse/KAFKA-13388
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Hoffman
>Priority: Minor
> Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, 
> image-2021-10-21-13-42-06-528.png
>
>
> I have been seeing expired batch errors in my app.
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for 
> xxx-17:120002 ms has passed since batch creation
> {code}
>  I would have assumed a request timout or connection timeout should have also 
> been logged. I could not find any other associated errors. 
> I added some instrumenting to my app and have traced this down to broker 
> connections hanging in CHECKING_API_VERSIONS state. -It appears there is no 
> effective timeout for Kafka Producer broker connections in 
> CHECKING_API_VERSIONS state.-
> In the code see the after the NetworkClient connects to a broker node it 
> makes a request to check api versions, when it receives the response it marks 
> the node as ready. -I am seeing that sometimes a reply is not received for 
> the check api versions request the connection just hangs in 
> CHECKING_API_VERSIONS state until it is disposed I assume after the idle 
> connection timeout.-
> Update: not actually sure what causes the connection to get stuck in 
> CHECKING_API_VERSIONS.
> -I am guessing the connection setup timeout should be still in play for this, 
> but it is not.- 
>  -There is a connectingNodes set that is consulted when checking timeouts and 
> the node is removed- 
>  -when ClusterConnectionStates.checkingApiVersions(String id) is called to 
> transition the node into CHECKING_API_VERSIONS-



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2021-11-05 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17439526#comment-17439526
 ] 

David Mao commented on KAFKA-13388:
---

[~dhofftgt]

Why do we expect a connection in CHECKING_API_VERSIONS to have in-flight 
requests?

I would expect the opposite: 

if a connection is in CHECKING_API_VERSIONS, it will *not* be ready for 
requests (at this point, the client does not know what API versions the broker 
supports, so it can't serialize requests to the appropriate version).

> Kafka Producer nodes stuck in CHECKING_API_VERSIONS
> ---
>
> Key: KAFKA-13388
> URL: https://issues.apache.org/jira/browse/KAFKA-13388
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Hoffman
>Priority: Minor
> Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, 
> image-2021-10-21-13-42-06-528.png
>
>
> I have been seeing expired batch errors in my app.
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for 
> xxx-17:120002 ms has passed since batch creation
> {code}
>  I would have assumed a request timout or connection timeout should have also 
> been logged. I could not find any other associated errors. 
> I added some instrumenting to my app and have traced this down to broker 
> connections hanging in CHECKING_API_VERSIONS state. -It appears there is no 
> effective timeout for Kafka Producer broker connections in 
> CHECKING_API_VERSIONS state.-
> In the code see the after the NetworkClient connects to a broker node it 
> makes a request to check api versions, when it receives the response it marks 
> the node as ready. -I am seeing that sometimes a reply is not received for 
> the check api versions request the connection just hangs in 
> CHECKING_API_VERSIONS state until it is disposed I assume after the idle 
> connection timeout.-
> Update: not actually sure what causes the connection to get stuck in 
> CHECKING_API_VERSIONS.
> -I am guessing the connection setup timeout should be still in play for this, 
> but it is not.- 
>  -There is a connectingNodes set that is consulted when checking timeouts and 
> the node is removed- 
>  -when ClusterConnectionStates.checkingApiVersions(String id) is called to 
> transition the node into CHECKING_API_VERSIONS-



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13360) Wrong SSL messages when handshake fails

2021-10-08 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17426334#comment-17426334
 ] 

David Mao commented on KAFKA-13360:
---

Very thorough writeup, nice find!

> Wrong SSL messages when handshake fails
> ---
>
> Key: KAFKA-13360
> URL: https://issues.apache.org/jira/browse/KAFKA-13360
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 2.8.0
> Environment: Two VMs, one running one Kafka broker and the other one 
> running kafka-console-consumer.sh.
> The consumer is validating the server certificate.
> Both VMs are VirtualBox running in the same laptop. 
> Using internal LAN.
> Latency is in the order of microseconds.
> More details in attached PDF.
>Reporter: Rodolfo Kohn
>Priority: Major
> Attachments: Kafka error.pdf, 
> dump_192.168.56.101_192.168.56.102_32776_9093_2021_10_06_21_09_19.pcap, 
> ssl_kafka_error_logs_match_ssl_logs.txt, 
> ssl_kafka_error_logs_match_ssl_logs2.txt
>
>
> When a consumer tries to connect to a Kafka broker and there is an error in 
> the SSL handshake, like the server sending a certificate that cannot be 
> validated for not matching the common name with the server/domain name, Kafka 
> sends out erroneous SSL messages before sending an SSL alert. This error 
> occurs in client but also can be seen in server.
> Because of the nature of the problem it seems it will happen in more if not 
> all handshake errors.
> I've debugged and analyzed the Kafka networking code in 
> org.apache.kafka.common.network and wrote a detailed description of how the 
> error occurs.
> Attaching the pcap file and a pdf with the detailed description of where the 
> error is in the networking code (SslTransportLayer, Channel, Selector).
> I executed a very basic test between kafka-console-consumer and a simple 
> installation of one Kafka broker with TLS.
> The test consisted on a Kafka broker with a certificate that didn’t match the 
> domain name I used to identify the server. The CA was well set up to avoid 
> related problems, like unknown CA error code. Thus, when the server sends the 
> certificate to the client, the handshake fails with code error 46 
> (certificate unknown). The goal was that my tool would detect the issue and 
> send an event, describing a TLS handshake problem for both processes. 
> However, I noticed the tool sent what I thought it was the wrong event, it 
> sent a TLS exception event for an unexpected message instead of an event for 
> TLS alert for certificate unknown.
> I noticed that during handshake, after the client receives Sever Hello, 
> Certificate, Server Key Exchange, and Server Hello Done, it sends out the 
> same Client Hello it sent at the beginning and then 3 more records with all 
> zeroes, in two more messages. It sent a total of 16,709 Bytes including the 
> 289 Bytes of Client Hello record.
>  
> This looks also like a design error regarding how protocol failures are 
> handled.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13354) Topic metrics count request rate inconsistently with other request metrics

2021-10-06 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-13354:
--
Description: 
The request rate metrics in BrokerTopicMetrics are incremented per partition in 
a Produce request. If a produce requests has multiple partitions for the same 
topic in the request, then the request will get counted multiple times.

This is inconsistent with how we count request rate metrics elsewhere.

The same applies to the TotalFetchRequest rate metric

  was:
The request rate metrics in BrokerTopicMetrics are incremented per partition in 
a Produce request. If a produce requests has multiple partitions for the same 
topic in the request, then the request will get counted multiple times.

This is inconsistent with how we count request rate metrics elsewhere.

The same applies to the TotalFetchRequest rate


> Topic metrics count request rate inconsistently with other request metrics
> --
>
> Key: KAFKA-13354
> URL: https://issues.apache.org/jira/browse/KAFKA-13354
> Project: Kafka
>  Issue Type: Bug
>  Components: core, metrics
>Reporter: David Mao
>Priority: Minor
>
> The request rate metrics in BrokerTopicMetrics are incremented per partition 
> in a Produce request. If a produce requests has multiple partitions for the 
> same topic in the request, then the request will get counted multiple times.
> This is inconsistent with how we count request rate metrics elsewhere.
> The same applies to the TotalFetchRequest rate metric



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13354) Topic metrics count request rate inconsistently with other request metrics

2021-10-06 Thread David Mao (Jira)
David Mao created KAFKA-13354:
-

 Summary: Topic metrics count request rate inconsistently with 
other request metrics
 Key: KAFKA-13354
 URL: https://issues.apache.org/jira/browse/KAFKA-13354
 Project: Kafka
  Issue Type: Bug
  Components: core, metrics
Reporter: David Mao


The request rate metrics in BrokerTopicMetrics are incremented per partition in 
a Produce request. If a produce requests has multiple partitions for the same 
topic in the request, then the request will get counted multiple times.

This is inconsistent with how we count request rate metrics elsewhere.

The same applies to the TotalFetchRequest rate



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13341) Quotas are not applied to requests with null clientId

2021-10-03 Thread David Mao (Jira)
David Mao created KAFKA-13341:
-

 Summary: Quotas are not applied to requests with null clientId
 Key: KAFKA-13341
 URL: https://issues.apache.org/jira/browse/KAFKA-13341
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao


ClientQuotaManager.DefaultQuotaCallback will not check for the existence of a 
default quota if a request's clientId is null. This results in null clientIds 
bypassing quotas.

Null clientIds are permitted in the protocol, so this seems like a bug.

This looks like it may be a regression introduced by 
https://github.com/apache/kafka/pull/7372



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13341) Quotas are not applied to requests with null clientId

2021-10-03 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-13341:
--
Priority: Major  (was: Minor)

> Quotas are not applied to requests with null clientId
> -
>
> Key: KAFKA-13341
> URL: https://issues.apache.org/jira/browse/KAFKA-13341
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Mao
>Priority: Major
>
> ClientQuotaManager.DefaultQuotaCallback will not check for the existence of a 
> default quota if a request's clientId is null. This results in null clientIds 
> bypassing quotas.
> Null clientIds are permitted in the protocol, so this seems like a bug.
> This looks like it may be a regression introduced by 
> https://github.com/apache/kafka/pull/7372



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13314) Pluggable components initialized with getConfiguredInstance do not respect dynamic config updates

2021-09-21 Thread David Mao (Jira)
David Mao created KAFKA-13314:
-

 Summary: Pluggable components initialized with 
getConfiguredInstance do not respect dynamic config updates
 Key: KAFKA-13314
 URL: https://issues.apache.org/jira/browse/KAFKA-13314
 Project: Kafka
  Issue Type: Bug
  Components: config, core
Reporter: David Mao






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13225) Controller skips sending UpdateMetadataRequest when shutting down broker doesnt host partitions

2021-08-24 Thread David Mao (Jira)
David Mao created KAFKA-13225:
-

 Summary: Controller skips sending UpdateMetadataRequest when 
shutting down broker doesnt host partitions 
 Key: KAFKA-13225
 URL: https://issues.apache.org/jira/browse/KAFKA-13225
 Project: Kafka
  Issue Type: Bug
  Components: controller
Reporter: David Mao
Assignee: David Mao


If a broker not hosting replicas for any partitions is shut down while there 
are offline partitions, the controller can fail to send out metadata updates to 
other brokers in the cluster.

 

Since this is a very niche scenario, I will leave the priority as Minor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13135) Reduce GroupMetadata lock contention for offset commit requests

2021-07-25 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17387028#comment-17387028
 ] 

David Mao edited comment on KAFKA-13135 at 7/26/21, 4:18 AM:
-

Taking a closer look, I think we can also optimize this for the happy path.

appendForGroup passes in the groupLock which gets locked during the entire 
putCacheCallback when completing the DelayedProduce from appending offset 
messages.

We already lock the groupLock inside of the callback when reading group state. 
-If we can maintain correctness without passing the groupLock to 
DelayedProduce, we can skip locking the group when sending back the offset 
response. I need to take a look at storeGroup to see if the groupLock is 
necessary there.-

 

https://issues.apache.org/jira/browse/KAFKA-6042 looks relevant before changing 
any of the locking semantics here. It looks like the DelayedProduce lock was 
originally added to avoid deadlock. 


was (Author: david.mao):
Taking a closer look, I think we can also optimize this for the happy path.

appendForGroup passes in the groupLock which gets locked during the entire 
putCacheCallback when completing the DelayedProduce from appending offset 
messages.

We already lock the groupLock inside of the callback when reading group state. 
If we can maintain correctness without passing the groupLock to DelayedProduce, 
we can skip locking the group when sending back the offset response. I need to 
take a look at storeGroup to see if the groupLock is necessary there.

 

https://issues.apache.org/jira/browse/KAFKA-6042 may be relevant before 
changing any of the locking semantics here.

> Reduce GroupMetadata lock contention for offset commit requests
> ---
>
> Key: KAFKA-13135
> URL: https://issues.apache.org/jira/browse/KAFKA-13135
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Mao
>Priority: Major
>
> as suggested by [~lbradstreet], we can look for similar optimizations to 
> https://issues.apache.org/jira/browse/KAFKA-13134 in the offset commit path.
> It looks like there are some straightforward optimizations possible for the 
> error path.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13135) Reduce GroupMetadata lock contention for offset commit requests

2021-07-25 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17387028#comment-17387028
 ] 

David Mao edited comment on KAFKA-13135 at 7/26/21, 3:48 AM:
-

Taking a closer look, I think we can also optimize this for the happy path.

appendForGroup passes in the groupLock which gets locked during the entire 
putCacheCallback when completing the DelayedProduce from appending offset 
messages.

We already lock the groupLock inside of the callback when reading group state. 
If we can maintain correctness without passing the groupLock to DelayedProduce, 
we can skip locking the group when sending back the offset response. I need to 
take a look at storeGroup to see if the groupLock is necessary there.

 

https://issues.apache.org/jira/browse/KAFKA-6042 may be relevant before 
changing any of the locking semantics here.


was (Author: david.mao):
Taking a closer look, I think we can also optimize this for the happy path.

appendForGroup passes in the groupLock which gets locked during the entire 
putCacheCallback when completing the DelayedProduce from appending offset 
messages.

We already lock the groupLock inside of the callback when reading group state. 
If we can maintain correctness without passing the groupLock to DelayedProduce, 
we can skip locking the group when sending back the offset response. I need to 
take a look at storeGroup to see if the groupLock is necessary there.

> Reduce GroupMetadata lock contention for offset commit requests
> ---
>
> Key: KAFKA-13135
> URL: https://issues.apache.org/jira/browse/KAFKA-13135
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Mao
>Priority: Major
>
> as suggested by [~lbradstreet], we can look for similar optimizations to 
> https://issues.apache.org/jira/browse/KAFKA-13134 in the offset commit path.
> It looks like there are some straightforward optimizations possible for the 
> error path.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13135) Reduce GroupMetadata lock contention for offset commit requests

2021-07-25 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17387028#comment-17387028
 ] 

David Mao edited comment on KAFKA-13135 at 7/26/21, 3:43 AM:
-

Taking a closer look, I think we can also optimize this for the happy path.

appendForGroup passes in the groupLock which gets locked during the entire 
putCacheCallback when completing the DelayedProduce from appending offset 
messages.

We already lock the groupLock inside of the callback when reading group state. 
If we can maintain correctness without passing the groupLock to DelayedProduce, 
we can skip locking the group when sending back the offset response. I need to 
take a look at storeGroup to see if the groupLock is necessary there.


was (Author: david.mao):
Taking a closer look, I think we can also optimize this for the happy path.

appendForGroup passes in the groupLock which gets locked during the entire 
putCacheCallback when completing the DelayedProduce from appending offset 
messages.

We already lock the groupLock inside of the callback when reading group state. 
If we can maintain correctness without passing the groupLock to DelayedProduce, 
we can achieve finer grained locking, and skip locking the group when sending 
back the offset response. I need to take a look at storeGroup to see if the 
groupLock is necessary there.

> Reduce GroupMetadata lock contention for offset commit requests
> ---
>
> Key: KAFKA-13135
> URL: https://issues.apache.org/jira/browse/KAFKA-13135
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Mao
>Priority: Major
>
> as suggested by [~lbradstreet], we can look for similar optimizations to 
> https://issues.apache.org/jira/browse/KAFKA-13134 in the offset commit path.
> It looks like there are some straightforward optimizations possible for the 
> error path.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13135) Reduce GroupMetadata lock contention for offset commit requests

2021-07-25 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17387028#comment-17387028
 ] 

David Mao edited comment on KAFKA-13135 at 7/26/21, 3:42 AM:
-

Taking a closer look, I think we can also optimize this for the happy path.

appendForGroup passes in the groupLock which gets locked during the entire 
putCacheCallback when completing the DelayedProduce from appending offset 
messages.

We already lock the groupLock inside of the callback when reading group state. 
If we can maintain correctness without passing the groupLock to DelayedProduce, 
we can achieve finer grained locking, and skip locking the group when sending 
back the offset response. I need to take a look at storeGroup to see if the 
groupLock is necessary there.


was (Author: david.mao):
Taking a closer look, I think we can also optimize this for the happy path.

appendForGroup passes in the groupLock which gets locked during the entire 
putCacheCallback when completing the DelayedProduce from appending offset 
messages.

We already lock the groupLock inside of the callback when reading group state, 
so we may be able to avoid passing in the groupLock, and achieve some finer 
grained locking. I need to take a look at storeGroup to see if the groupLock is 
necessary there.

> Reduce GroupMetadata lock contention for offset commit requests
> ---
>
> Key: KAFKA-13135
> URL: https://issues.apache.org/jira/browse/KAFKA-13135
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Mao
>Priority: Major
>
> as suggested by [~lbradstreet], we can look for similar optimizations to 
> https://issues.apache.org/jira/browse/KAFKA-13134 in the offset commit path.
> It looks like there are some straightforward optimizations possible for the 
> error path.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13135) Reduce GroupMetadata lock contention for offset commit requests

2021-07-25 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17387028#comment-17387028
 ] 

David Mao commented on KAFKA-13135:
---

Taking a closer look, I think we can also optimize this for the happy path.

appendForGroup passes in the groupLock which gets locked during the entire 
putCacheCallback when completing the DelayedProduce from appending offset 
messages.

We already lock the groupLock inside of the callback when reading group state, 
so we may be able to avoid passing in the groupLock, and achieve some finer 
grained locking. I need to take a look at storeGroup to see if the groupLock is 
necessary there.

> Reduce GroupMetadata lock contention for offset commit requests
> ---
>
> Key: KAFKA-13135
> URL: https://issues.apache.org/jira/browse/KAFKA-13135
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Mao
>Priority: Major
>
> as suggested by [~lbradstreet], we can look for similar optimizations to 
> https://issues.apache.org/jira/browse/KAFKA-13134 in the offset commit path.
> It looks like there are some straightforward optimizations possible for the 
> error path.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13135) Reduce GroupMetadata lock contention for offset commit requests

2021-07-25 Thread David Mao (Jira)
David Mao created KAFKA-13135:
-

 Summary: Reduce GroupMetadata lock contention for offset commit 
requests
 Key: KAFKA-13135
 URL: https://issues.apache.org/jira/browse/KAFKA-13135
 Project: Kafka
  Issue Type: Improvement
Reporter: David Mao


as suggested by [~lbradstreet], we can look for similar optimizations to 
https://issues.apache.org/jira/browse/KAFKA-13134 in the offset commit path.

It looks like there are some straightforward optimizations possible for the 
error path.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13134) Heartbeat Request high lock contention

2021-07-25 Thread David Mao (Jira)
David Mao created KAFKA-13134:
-

 Summary: Heartbeat Request high lock contention
 Key: KAFKA-13134
 URL: https://issues.apache.org/jira/browse/KAFKA-13134
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: David Mao
Assignee: David Mao


On a cluster with high heartbeat rate, a lock profile showed high contention 
for the GroupMetadata lock.

We can significantly reduce this by invoking the response callback outside of 
the group metadata lock.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12986) Throttled Replicas validator should validate that the proposed value is parseable

2021-06-23 Thread David Mao (Jira)
David Mao created KAFKA-12986:
-

 Summary: Throttled Replicas validator should validate that the 
proposed value is parseable
 Key: KAFKA-12986
 URL: https://issues.apache.org/jira/browse/KAFKA-12986
 Project: Kafka
  Issue Type: Bug
  Components: admin, core
Reporter: David Mao


The ThrottledReplicaListValidator currently allows a string like 

leader.replication.throttled.replicas=,0:1 to be set

which is unparseable by the TopicConfig callback handler.

For robustness, the validator should also validate that the property can be 
parsed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10749) Add throttling of IPs by connection rate

2021-03-22 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao resolved KAFKA-10749.
---
Resolution: Fixed

> Add throttling of IPs by connection rate
> 
>
> Key: KAFKA-10749
> URL: https://issues.apache.org/jira/browse/KAFKA-10749
> Project: Kafka
>  Issue Type: New Feature
>  Components: core, network
>Reporter: David Mao
>Assignee: David Mao
>Priority: Major
> Fix For: 2.8.0
>
>
> This tracks the completion of IP connection rate throttling as detailed in
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12427) Broker does not close muted idle connections with buffered data

2021-03-04 Thread David Mao (Jira)
David Mao created KAFKA-12427:
-

 Summary: Broker does not close muted idle connections with 
buffered data
 Key: KAFKA-12427
 URL: https://issues.apache.org/jira/browse/KAFKA-12427
 Project: Kafka
  Issue Type: Bug
  Components: core, network
Reporter: David Mao
Assignee: David Mao






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10024) Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-19 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10024:
--
Parent: KAFKA-10749
Issue Type: Sub-task  (was: Improvement)

> Add dynamic configuration and enforce quota for per-IP connection rate limits 
> (KIP-612, part 2)
> ---
>
> Key: KAFKA-10024
> URL: https://issues.apache.org/jira/browse/KAFKA-10024
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Anna Povzner
>Assignee: David Mao
>Priority: Major
>  Labels: features
> Fix For: 2.8.0
>
>
> This JIRA is for the second part of KIP-612 – Add per-IP connection creation 
> rate limits.
> As described here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10748) Add IP connection rate throttling metric

2020-11-19 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10748:
--
Parent: KAFKA-10749
Issue Type: Sub-task  (was: Improvement)

> Add IP connection rate throttling metric
> 
>
> Key: KAFKA-10748
> URL: https://issues.apache.org/jira/browse/KAFKA-10748
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, network
>Affects Versions: 2.8.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10747) Implement ClientQuota APIs for altering and describing IP entity quotas

2020-11-19 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10747:
--
Parent: KAFKA-10749
Issue Type: Sub-task  (was: Improvement)

> Implement ClientQuota APIs for altering and describing IP entity quotas 
> 
>
> Key: KAFKA-10747
> URL: https://issues.apache.org/jira/browse/KAFKA-10747
> Project: Kafka
>  Issue Type: Sub-task
>  Components: config, core
>Affects Versions: 2.8.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10749) Add throttling of IPs by connection rate

2020-11-19 Thread David Mao (Jira)
David Mao created KAFKA-10749:
-

 Summary: Add throttling of IPs by connection rate
 Key: KAFKA-10749
 URL: https://issues.apache.org/jira/browse/KAFKA-10749
 Project: Kafka
  Issue Type: New Feature
  Components: core, network
Reporter: David Mao
Assignee: David Mao
 Fix For: 2.8.0


This tracks the completion of IP connection rate throttling as detailed in

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10748) Add IP connection rate throttling metric

2020-11-19 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10748:
--
Component/s: network
 core

> Add IP connection rate throttling metric
> 
>
> Key: KAFKA-10748
> URL: https://issues.apache.org/jira/browse/KAFKA-10748
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, network
>Affects Versions: 2.8.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10747) Implement ClientQuota APIs for altering and describing IP entity quotas

2020-11-19 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10747:
--
Component/s: core
 config

> Implement ClientQuota APIs for altering and describing IP entity quotas 
> 
>
> Key: KAFKA-10747
> URL: https://issues.apache.org/jira/browse/KAFKA-10747
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, core
>Affects Versions: 2.8.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10748) Add IP connection rate throttling metric

2020-11-19 Thread David Mao (Jira)
David Mao created KAFKA-10748:
-

 Summary: Add IP connection rate throttling metric
 Key: KAFKA-10748
 URL: https://issues.apache.org/jira/browse/KAFKA-10748
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.8.0
Reporter: David Mao
Assignee: David Mao






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10747) Implement ClientQuota APIs for altering and describing IP entity quotas

2020-11-19 Thread David Mao (Jira)
David Mao created KAFKA-10747:
-

 Summary: Implement ClientQuota APIs for altering and describing IP 
entity quotas 
 Key: KAFKA-10747
 URL: https://issues.apache.org/jira/browse/KAFKA-10747
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.8.0
Reporter: David Mao
Assignee: David Mao






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10744) Listener and broker connection rate quota tests should use mock time

2020-11-18 Thread David Mao (Jira)
David Mao created KAFKA-10744:
-

 Summary: Listener and broker connection rate quota tests should 
use mock time
 Key: KAFKA-10744
 URL: https://issues.apache.org/jira/browse/KAFKA-10744
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: David Mao
Assignee: David Mao


Currently the tests for these features use system time because 
{{ConnectionQuotas}} uses {{Object.wait()}} to block the acceptor when waiting 
for throttle or to wait for a connection slot. We can extend the {{Time}} 
interface to provide this functionality, while still being able to mock out any 
object waiting in unit tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10695) Improve interface for sensor expiration

2020-11-07 Thread David Mao (Jira)
David Mao created KAFKA-10695:
-

 Summary: Improve interface for sensor expiration
 Key: KAFKA-10695
 URL: https://issues.apache.org/jira/browse/KAFKA-10695
 Project: Kafka
  Issue Type: Improvement
  Components: metrics
Reporter: David Mao


To check whether a sensor has expired, a user should be able to check the 
return value of {{Sensor.hasExpired()}}.{{ }}However, recording to an expired 
sensor isn't prohibited, and if a user does record to an expired sensor, we can 
lose the information that the sensor was expired and removed from metrics.

The way we work around this currently, is that all expiring sensors are 
retrieved from {{Metrics}} each time.

A potential improvement would be to add some internal state on whether a sensor 
has been expired (removed from {{Metrics)}}, and hasExpired should then return 
that internal state. To do this, we would need to add a new method in Sensor 
that expresses the intent of expiring a sensor.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10693) Tests instantiate QuotaManagers without closing the managers in teardown

2020-11-06 Thread David Mao (Jira)
David Mao created KAFKA-10693:
-

 Summary: Tests instantiate QuotaManagers without closing the 
managers in teardown
 Key: KAFKA-10693
 URL: https://issues.apache.org/jira/browse/KAFKA-10693
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao
Assignee: David Mao






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10673) ConnectionQuotas should cache interbroker listener name

2020-11-03 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10673:
--
Description: 
ConnectionQuotas.protectedListener calls config.interBrokerListenerName. This 
is a surprisingly expensive call that creates a copy of all properties set on 
the config. Given that this method is called multiple times per connection 
created, this is not really ideal.

 

Profile attached showing allocations in protectedListener()

 

 

  was:
ConnectionQuotas.protectedListener calls `config.interBrokerListenerName`. This 
is a surprisingly expensive call that creates a copy of all properties set on 
the config. Given that this method is called multiple times per connection 
created, this is not really ideal.

 

Profile attached showing allocations in `protectedListener`

 

 


> ConnectionQuotas should cache interbroker listener name
> ---
>
> Key: KAFKA-10673
> URL: https://issues.apache.org/jira/browse/KAFKA-10673
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.7.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Minor
> Fix For: 2.8.0
>
> Attachments: Screen Shot 2020-11-03 at 8.34.48 PM.png
>
>
> ConnectionQuotas.protectedListener calls config.interBrokerListenerName. This 
> is a surprisingly expensive call that creates a copy of all properties set on 
> the config. Given that this method is called multiple times per connection 
> created, this is not really ideal.
>  
> Profile attached showing allocations in protectedListener()
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10673) ConnectionQuotas should cache interbroker listener name

2020-11-03 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10673:
--
Description: 
ConnectionQuotas.protectedListener calls `config.interBrokerListenerName`. This 
is a surprisingly expensive call that creates a copy of all properties set on 
the config. Given that this method is called multiple times per connection 
created, this is not really ideal.

 

Profile attached showing allocations in `protectedListener`

 

 

  was:
ConnectionQuotas.protectedListener calls `config.interBrokerListenerName`. This 
is a surprisingly expensive call that creates a copy of all properties set on 
the config. Given that this method is called multiple times per connection 
created, this is not really ideal.

 

Profile attached showing allocations. Allocations in Acceptor.run() made up 2% 
of the allocations on a fairly busy cluster.

 

 


> ConnectionQuotas should cache interbroker listener name
> ---
>
> Key: KAFKA-10673
> URL: https://issues.apache.org/jira/browse/KAFKA-10673
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.7.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Minor
> Fix For: 2.8.0
>
> Attachments: Screen Shot 2020-11-03 at 8.34.48 PM.png
>
>
> ConnectionQuotas.protectedListener calls `config.interBrokerListenerName`. 
> This is a surprisingly expensive call that creates a copy of all properties 
> set on the config. Given that this method is called multiple times per 
> connection created, this is not really ideal.
>  
> Profile attached showing allocations in `protectedListener`
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10673) ConnectionQuotas should cache interbroker listener name

2020-11-03 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10673:
--
Description: 
ConnectionQuotas.protectedListener calls `config.interBrokerListenerName`. This 
is a surprisingly expensive call that creates a copy of all properties set on 
the config. Given that this method is called multiple times per connection 
created, this is not really ideal.

 

Profile attached showing allocations. Allocations in Acceptor.run() made up 2% 
of the allocations on a fairly busy cluster.

 

 

  was:
!Screen Shot 2020-11-03 at 8.34.48 PM.png! ConnectionQuotas.protectedListener 
calls `config.interBrokerListenerName`. This is a surprisingly expensive call 
that creates a copy of all properties set on the config. Given that this method 
is called multiple times per connection created, this is not really ideal.

 

Profile attached showing allocations. Allocations in Acceptor.run() made up 2% 
of the allocations on a fairly busy cluster.

 


> ConnectionQuotas should cache interbroker listener name
> ---
>
> Key: KAFKA-10673
> URL: https://issues.apache.org/jira/browse/KAFKA-10673
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.7.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Minor
> Fix For: 2.8.0
>
> Attachments: Screen Shot 2020-11-03 at 8.34.48 PM.png
>
>
> ConnectionQuotas.protectedListener calls `config.interBrokerListenerName`. 
> This is a surprisingly expensive call that creates a copy of all properties 
> set on the config. Given that this method is called multiple times per 
> connection created, this is not really ideal.
>  
> Profile attached showing allocations. Allocations in Acceptor.run() made up 
> 2% of the allocations on a fairly busy cluster.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10673) ConnectionQuotas should cache interbroker listener name

2020-11-03 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10673:
--
Description: 
!Screen Shot 2020-11-03 at 8.34.48 PM.png! ConnectionQuotas.protectedListener 
calls `config.interBrokerListenerName`. This is a surprisingly expensive call 
that creates a copy of all properties set on the config. Given that this method 
is called multiple times per connection created, this is not really ideal.

 

Profile attached showing allocations. Allocations in Acceptor.run() made up 2% 
of the allocations on a fairly busy cluster.

 

  was:
ConnectionQuotas.protectedListener calls `config.interBrokerListenerName`. This 
is a surprisingly expensive call that creates a copy of all properties set on 
the config. Given that this method is called multiple times per connection 
created, this is not really ideal.

 

Profile attached showing allocations. Allocations in Acceptor.run() made up 2% 
of the allocations on a fairly busy cluster.

 


> ConnectionQuotas should cache interbroker listener name
> ---
>
> Key: KAFKA-10673
> URL: https://issues.apache.org/jira/browse/KAFKA-10673
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.7.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Minor
> Fix For: 2.8.0
>
> Attachments: Screen Shot 2020-11-03 at 8.34.48 PM.png
>
>
> !Screen Shot 2020-11-03 at 8.34.48 PM.png! ConnectionQuotas.protectedListener 
> calls `config.interBrokerListenerName`. This is a surprisingly expensive call 
> that creates a copy of all properties set on the config. Given that this 
> method is called multiple times per connection created, this is not really 
> ideal.
>  
> Profile attached showing allocations. Allocations in Acceptor.run() made up 
> 2% of the allocations on a fairly busy cluster.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10673) ConnectionQuotas should cache interbroker listener name

2020-11-03 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10673:
--
Attachment: Screen Shot 2020-11-03 at 8.34.48 PM.png

> ConnectionQuotas should cache interbroker listener name
> ---
>
> Key: KAFKA-10673
> URL: https://issues.apache.org/jira/browse/KAFKA-10673
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.7.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Minor
> Fix For: 2.8.0
>
> Attachments: Screen Shot 2020-11-03 at 8.34.48 PM.png
>
>
> ConnectionQuotas.protectedListener calls `config.interBrokerListenerName`. 
> This is a surprisingly expensive call that creates a copy of all properties 
> set on the config. Given that this method is called multiple times per 
> connection created, this is not really ideal.
>  
> Profile attached showing allocations. Allocations in Acceptor.run() made up 
> 2% of the allocations on a fairly busy cluster.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10673) ConnectionQuotas should cache interbroker listener name

2020-11-03 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10673:
--
Description: 
ConnectionQuotas.protectedListener calls `config.interBrokerListenerName`. This 
is a surprisingly expensive call that creates a copy of all properties set on 
the config. Given that this method is called multiple times per connection 
created, this is not really ideal.

 

Profile attached showing allocations. Allocations in Acceptor.run() made up 2% 
of the allocations on a fairly busy cluster.

 

  was:ConnectionQuotas.protectedListener calls 
`config.interBrokerListenerName`. This is a surprisingly expensive call that 
creates a copy of all properties set on the config. Given that this method is 
called multiple times per connection created, this is not really ideal.


> ConnectionQuotas should cache interbroker listener name
> ---
>
> Key: KAFKA-10673
> URL: https://issues.apache.org/jira/browse/KAFKA-10673
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.7.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Minor
> Fix For: 2.8.0
>
>
> ConnectionQuotas.protectedListener calls `config.interBrokerListenerName`. 
> This is a surprisingly expensive call that creates a copy of all properties 
> set on the config. Given that this method is called multiple times per 
> connection created, this is not really ideal.
>  
> Profile attached showing allocations. Allocations in Acceptor.run() made up 
> 2% of the allocations on a fairly busy cluster.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10673) ConnectionQuotas should cache interbroker listener name

2020-11-03 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10673:
--
Priority: Minor  (was: Major)

> ConnectionQuotas should cache interbroker listener name
> ---
>
> Key: KAFKA-10673
> URL: https://issues.apache.org/jira/browse/KAFKA-10673
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.7.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Minor
> Fix For: 2.8.0
>
>
> ConnectionQuotas.protectedListener calls `config.interBrokerListenerName`. 
> This is a surprisingly expensive call that creates a copy of all properties 
> set on the config. Given that this method is called multiple times per 
> connection created, this is not really ideal.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10673) ConnectionQuotas should cache interbroker listener name

2020-11-02 Thread David Mao (Jira)
David Mao created KAFKA-10673:
-

 Summary: ConnectionQuotas should cache interbroker listener name
 Key: KAFKA-10673
 URL: https://issues.apache.org/jira/browse/KAFKA-10673
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.7.0
Reporter: David Mao
Assignee: David Mao
 Fix For: 2.8.0


ConnectionQuotas.protectedListener calls `config.interBrokerListenerName`. This 
is a surprisingly expensive call that creates a copy of all properties set on 
the config. Given that this method is called multiple times per connection 
created, this is not really ideal.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9189) Shutdown is blocked if connection to Zookeeper is lost

2020-06-10 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao reassigned KAFKA-9189:


Assignee: David Mao

> Shutdown is blocked if connection to Zookeeper is lost
> --
>
> Key: KAFKA-9189
> URL: https://issues.apache.org/jira/browse/KAFKA-9189
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0
> Environment: Linux, Docker 19.03.4
>Reporter: Boris Granveaud
>Assignee: David Mao
>Priority: Minor
>
> We are using Kafka and Zookeeper in Docker swarm stacks. When we undeploy a 
> stack, sometimes Kafka doesn't shutdown properly and is finally killed by 
> Docker (thus leaving potentially corrupted files).
> Here are the steps to reproduce (simple Docker, no swarm):
>  
> {code:java}
> docker network create test
> docker run -d --network test --name zk --rm zookeeper:3.5.6
> docker run --network test --name kf --rm -e "KAFKA_ZOOKEEPER_CONNECT=zk:2181" 
> -e "KAFKA_ADVERTISED_LISTENERS=INSIDE://:9091" -e 
> "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT" -e 
> "KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE" confluentinc/cp-kafka:5.3.1
> {code}
>  
> In another shell:
>  
> {code:java}
> docker stop zk
> docker stop kf
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9189) Shutdown is blocked if connection to Zookeeper is lost

2020-04-13 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17082642#comment-17082642
 ] 

David Mao commented on KAFKA-9189:
--

also observed this issue, took a thread-dump and found that 
controlledShutdown() can block indefinitely in zkClient.getControllerId
{code:java}
"kafka-shutdown-hook" #24 prio=5 os_prio=0 cpu=9.96ms elapsed=184687.63s 
tid=0x7f3b20786000 nid=0x4e69 waiting on condition [0x7f3a8559a000]
 java.lang.Thread.State: TIMED_WAITING (parking)
 at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method)
 - parking to wait for <0x0006884f6808> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at 
java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.5/LockSupport.java:234)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(java.base@11.0.5/AbstractQueuedSynchronizer.java:2123)
 at 
kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:264)
 at 
kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:258)
 at 
kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$1(ZooKeeperClient.scala:252)
 at 
kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:252)
 at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1783)
 at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1753)
 at kafka.zk.KafkaZkClient.retryRequestUntilConnected(KafkaZkClient.scala:1748)
 at kafka.zk.KafkaZkClient.getControllerId(KafkaZkClient.scala:1119)
 at kafka.server.KafkaServer.doControlledShutdown$1(KafkaServer.scala:688)
 at kafka.server.KafkaServer.controlledShutdown(KafkaServer.scala:772)
 at kafka.server.KafkaServer.$anonfun$shutdown$2(KafkaServer.scala:794)
 at 
kafka.server.KafkaServer$$Lambda$1583/0x000840924c40.apply$mcV$sp(Unknown 
Source)
 at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
 at kafka.server.KafkaServer.shutdown(KafkaServer.scala:794)
 at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:54)
 at kafka.Kafka$.$anonfun$main$3(Kafka.scala:80)
 at kafka.Kafka$$$Lambda$172/0x0008402a1440.apply$mcV$sp(Unknown Source)
 at kafka.utils.Exit$.$anonfun$addShutdownHook$1(Exit.scala:38)
 at kafka.Kafka$$$Lambda$173/0x0008402a7840.run(Unknown Source)
 at java.lang.Thread.run(java.base@11.0.5/Thread.java:834){code}

> Shutdown is blocked if connection to Zookeeper is lost
> --
>
> Key: KAFKA-9189
> URL: https://issues.apache.org/jira/browse/KAFKA-9189
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0
> Environment: Linux, Docker 19.03.4
>Reporter: Boris Granveaud
>Priority: Minor
>
> We are using Kafka and Zookeeper in Docker swarm stacks. When we undeploy a 
> stack, sometimes Kafka doesn't shutdown properly and is finally killed by 
> Docker (thus leaving potentially corrupted files).
> Here are the steps to reproduce (simple Docker, no swarm):
>  
> {code:java}
> docker network create test
> docker run -d --network test --name zk --rm zookeeper:3.5.6
> docker run --network test --name kf --rm -e "KAFKA_ZOOKEEPER_CONNECT=zk:2181" 
> -e "KAFKA_ADVERTISED_LISTENERS=INSIDE://:9091" -e 
> "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT" -e 
> "KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE" confluentinc/cp-kafka:5.3.1
> {code}
>  
> In another shell:
>  
> {code:java}
> docker stop zk
> docker stop kf
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is invali

2020-02-10 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao reassigned KAFKA-6266:


Assignee: David Mao  (was: Anna Povzner)

> Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of 
> __consumer_offsets-xx to log start offset 203569 since the checkpointed 
> offset 120955 is invalid. (kafka.log.LogCleanerManager$)
> --
>
> Key: KAFKA-6266
> URL: https://issues.apache.org/jira/browse/KAFKA-6266
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.0.0, 1.0.1
> Environment: CentOS 7, Apache kafka_2.12-1.0.0
>Reporter: VinayKumar
>Assignee: David Mao
>Priority: Major
> Fix For: 2.5.0, 2.4.1
>
>
> I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below 
> warnings in the log.
>  I'm seeing these continuously in the log, and want these to be fixed- so 
> that they wont repeat. Can someone please help me in fixing the below 
> warnings.
> {code}
> WARN Resetting first dirty offset of __consumer_offsets-17 to log start 
> offset 3346 since the checkpointed offset 3332 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-23 to log start 
> offset 4 since the checkpointed offset 1 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-19 to log start 
> offset 203569 since the checkpointed offset 120955 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-35 to log start 
> offset 16957 since the checkpointed offset 7 is invalid. 
> (kafka.log.LogCleanerManager$)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9507) AdminClient should check for missing committed offsets

2020-02-06 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032026#comment-17032026
 ] 

David Mao commented on KAFKA-9507:
--

[https://github.com/apache/kafka/pull/8057]

Pull request to address this issue.

> AdminClient should check for missing committed offsets
> --
>
> Key: KAFKA-9507
> URL: https://issues.apache.org/jira/browse/KAFKA-9507
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: David Mao
>Priority: Major
>  Labels: newbie
>
> I noticed this exception getting raised:
> {code}
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>   at 
> org.apache.kafka.clients.consumer.OffsetAndMetadata.(OffsetAndMetadata.java:50)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$24$1.handleResponse(KafkaAdminClient.java:2832)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1032)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1160)
> {code}
> The AdminClient should check for negative offsets in OffsetFetch responses in 
> the api `listConsumerGroupOffsets`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9507) AdminClient should check for missing committed offsets

2020-02-06 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao reassigned KAFKA-9507:


Assignee: David Mao

> AdminClient should check for missing committed offsets
> --
>
> Key: KAFKA-9507
> URL: https://issues.apache.org/jira/browse/KAFKA-9507
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: David Mao
>Priority: Major
>  Labels: newbie
>
> I noticed this exception getting raised:
> {code}
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>   at 
> org.apache.kafka.clients.consumer.OffsetAndMetadata.(OffsetAndMetadata.java:50)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$24$1.handleResponse(KafkaAdminClient.java:2832)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1032)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1160)
> {code}
> The AdminClient should check for negative offsets in OffsetFetch responses in 
> the api `listConsumerGroupOffsets`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8583) Optimization for SslTransportLayer#write(ByteBuffer)

2019-06-21 Thread Mao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mao updated KAFKA-8583:
---
Description: 
The is a propose to optimize SslTransportLayer#write(ByteBuffer).

Currently, sslEngine only reads data from src once per selector loop.

This can be optimized by reading all src data in one selector loop.

The propose comes from [Amrby|[https://github.com/linkedin/ambry/pull/1105]], 
which uses same Transport Layer code and change has been deployed to prod for 6 
months. Unfortunately, Ambry didn't record the perf diff.

 

Code change: [https://github.com/apache/kafka/pull/6984]

Let me know if anything needed. 

 

  was:
The is a propose to optimize SslTransportLayer#write(ByteBuffer).

Currently, sslEngine only reads data from src once per selector loop.

This can be optimized by reading all src data in one selector loop.

The propose comes from [Amrby|[https://github.com/linkedin/ambry/pull/1105]], 
which uses same Transport Layer code. 

 

Code change: [https://github.com/apache/kafka/pull/6984]

Let me know if anything needed. 

 


> Optimization for SslTransportLayer#write(ByteBuffer)
> 
>
> Key: KAFKA-8583
> URL: https://issues.apache.org/jira/browse/KAFKA-8583
> Project: Kafka
>  Issue Type: Improvement
>  Components: network
>Reporter: Mao
>Priority: Major
>
> The is a propose to optimize SslTransportLayer#write(ByteBuffer).
> Currently, sslEngine only reads data from src once per selector loop.
> This can be optimized by reading all src data in one selector loop.
> The propose comes from [Amrby|[https://github.com/linkedin/ambry/pull/1105]], 
> which uses same Transport Layer code and change has been deployed to prod for 
> 6 months. Unfortunately, Ambry didn't record the perf diff.
>  
> Code change: [https://github.com/apache/kafka/pull/6984]
> Let me know if anything needed. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8583) Optimization for SslTransportLayer#write(ByteBuffer)

2019-06-21 Thread Mao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mao updated KAFKA-8583:
---
Description: 
The is a propose to optimize SslTransportLayer#write(ByteBuffer).

Currently, sslEngine only reads data from src once per selector loop.

This can be optimized by reading all src data in one selector loop.

The propose comes from [Amrby|[https://github.com/linkedin/ambry/pull/1105]], 
which uses same Transport Layer code. 

 

Code change: [https://github.com/apache/kafka/pull/6984]

Let me know if anything needed. 

 

  was:
The is a propose to optimize SslTransportLayer#write(ByteBuffer).

Currently, sslEngine only reads data from src once per selector loop.

This can be optimized by reading all src data in one selector loop.

The propose comes from [Amrby|[https://github.com/linkedin/ambry/pull/1105]], 
which uses same Transport Layer code. 

 


> Optimization for SslTransportLayer#write(ByteBuffer)
> 
>
> Key: KAFKA-8583
> URL: https://issues.apache.org/jira/browse/KAFKA-8583
> Project: Kafka
>  Issue Type: Improvement
>  Components: network
>Reporter: Mao
>Priority: Major
>
> The is a propose to optimize SslTransportLayer#write(ByteBuffer).
> Currently, sslEngine only reads data from src once per selector loop.
> This can be optimized by reading all src data in one selector loop.
> The propose comes from [Amrby|[https://github.com/linkedin/ambry/pull/1105]], 
> which uses same Transport Layer code. 
>  
> Code change: [https://github.com/apache/kafka/pull/6984]
> Let me know if anything needed. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8583) Optimization for SslTransportLayer#write(ByteBuffer)

2019-06-21 Thread Mao (JIRA)
Mao created KAFKA-8583:
--

 Summary: Optimization for SslTransportLayer#write(ByteBuffer)
 Key: KAFKA-8583
 URL: https://issues.apache.org/jira/browse/KAFKA-8583
 Project: Kafka
  Issue Type: Improvement
  Components: network
Reporter: Mao


The is a propose to optimize SslTransportLayer#write(ByteBuffer).

Currently, sslEngine only reads data from src once per selector loop.

This can be optimized by reading all src data in one selector loop.

The propose comes from [Amrby|[https://github.com/linkedin/ambry/pull/1105]], 
which uses same Transport Layer code. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)