[jira] [Updated] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-18 Thread Cameron Redpath (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cameron Redpath updated KAFKA-16277:

Description: 
Consider the following scenario:

`topic-1`: 12 partitions

`topic-2`: 12 partitions

 

Of note, `topic-1` gets approximately 10 times more messages through it than 
`topic-2`. 

 

Both of these topics are consumed by a single application, single consumer 
group, which scales under load. Each member of the consumer group subscribes to 
both topics. The `partition.assignment.strategy` being used is 
`org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The application 
may start with one consumer. It consumes all partitions from both topics.

 

The problem begins when the application scales up to two consumers. What is 
seen is that all partitions from `topic-1` go to one consumer, and all 
partitions from `topic-2` go to the other consumer. In the case with one topic 
receiving more messages than the other, this results in a very imbalanced group 
where one consumer is receiving 10x the traffic of the other due to partition 
assignment.

 

This is the issue being seen in our cluster at the moment. See this graph of 
the number of messages being processed by each consumer as the group scales 
from one to four consumers:

!image-2024-02-19-13-00-28-306.png|width=537,height=612!

Things to note from this graphic:
 * With two consumers, the partitions for a topic all go to a single consumer 
each
 * With three consumers, the partitions for a topic are split between two 
consumers each
 * With four consumers, the partitions for a topic are split between three 
consumers each
 * The total number of messages being processed by each consumer in the group 
is very imbalanced throughout the entire period

 

With regard to the number of _partitions_ being assigned to each consumer, the 
group is balanced. However, the assignment appears to be biased so that 
partitions from the same topic go to the same consumer. In our scenario, this 
leads to very undesirable partition assignment.

 

I question if the behaviour of the assignor should be revised, so that each 
topic has its partitions maximally spread across all available members of the 
consumer group. In the above scenario, this would result in much more even 
distribution of load. The behaviour would then be:
 * With two consumers, 6 partitions from each topic go to each consumer
 * With three consumers, 4 partitions from each topic go to each consumer
 * With four consumers, 3 partitions from each topic go to each consumer

 

Of note, we only saw this behaviour after migrating to the 
`CooperativeStickyAssignor`. It was not an issue with the default partition 
assignment strategy.

 

It is possible this may be intended behaviour. In which case, what is the 
preferred workaround for our scenario? Our current workaround if we decide to 
go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
consumers so they only subscribe to one topic, and have two consumer threads 
per instance of the application.  

  was:
Consider the following scenario:

`topic-1`: 12 partitions

`topic-2`: 12 partitions

 

Of note, `topic-1` gets approximately 10 times more messages through it than 
`topic-2`. 

 

Both of these topics are consumed by a single application, single consumer 
group, which scales under load. Each member of the consumer group subscribes to 
both topics. The `partition.assignment.strategy` being used is 
`org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The application 
may start with one consumer. It consumes all partitions from both topics.

 

The problem begins when the application scales up to two consumers. What is 
seen is that all partitions from `topic-1` go to one consumer, and all 
partitions from `topic-2` go to the other consumer. In the case with one topic 
receiving more messages than the other, this results in a very imbalanced group 
where one consumer is receiving 10x the traffic of the other due to partition 
assignment.

 

This is the issue being seen in our cluster at the moment. See this graph of 
the number of messages being processed by each consumer as the group scales 
from one to four consumers:

!image-2024-02-19-13-00-28-306.png|width=537,height=612!

Things to note from this graphic:
 * With two consumers, the partitions for a topic all go to a single consumer 
each
 * With three consumers, the partitions for a topic are split between two 
consumers each
 * With four consumers, the partitions for a topic are split between three 
consumers each
 * The total number of messages being processed by each consumer group is very 
imbalanced throughout the entire period

 

With regard to the number of _partitions_ being assigned to each consumer, the 
group is balanced. However, the assignment appears to be biased so that 
partitions from the same topic go to the same 

Re: [PR] MINOR: Fix MetricsTest.testBrokerTopicMetricsBytesInOut assertion [kafka]

2024-02-18 Thread via GitHub


showuon commented on PR #14744:
URL: https://github.com/apache/kafka/pull/14744#issuecomment-1951789085

   Retriggering CI: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14744/4/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-02-18 Thread via GitHub


showuon commented on PR #15213:
URL: https://github.com/apache/kafka/pull/15213#issuecomment-1951764274

   @clolov , there's a compilation error in jdk 8/scala 2.12. Could you help 
fix it? Maybe rebasing to the latest trunk could solve it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14467:Add a test to validate the replica state after processing the OFFSET_MOVED_TO_TIERED_STORAGE error [kafka]

2024-02-18 Thread via GitHub


showuon commented on PR #14652:
URL: https://github.com/apache/kafka/pull/14652#issuecomment-1951760048

   @hudeqi , are you still interested in completing this PR? There seems to 
have some unaddressed comments..=


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-18 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16277:
--
Component/s: clients
 consumer

> CooperativeStickyAssignor does not spread topics evenly among consumer group
> 
>
> Key: KAFKA-16277
> URL: https://issues.apache.org/jira/browse/KAFKA-16277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Cameron Redpath
>Priority: Major
> Attachments: image-2024-02-19-13-00-28-306.png
>
>
> Consider the following scenario:
> `topic-1`: 12 partitions
> `topic-2`: 12 partitions
>  
> Of note, `topic-1` gets approximately 10 times more messages through it than 
> `topic-2`. 
>  
> Both of these topics are consumed by a single application, single consumer 
> group, which scales under load. Each member of the consumer group subscribes 
> to both topics. The `partition.assignment.strategy` being used is 
> `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The 
> application may start with one consumer. It consumes all partitions from both 
> topics.
>  
> The problem begins when the application scales up to two consumers. What is 
> seen is that all partitions from `topic-1` go to one consumer, and all 
> partitions from `topic-2` go to the other consumer. In the case with one 
> topic receiving more messages than the other, this results in a very 
> imbalanced group where one consumer is receiving 10x the traffic of the other 
> due to partition assignment.
>  
> This is the issue being seen in our cluster at the moment. See this graph of 
> the number of messages being processed by each consumer as the group scales 
> from one to four consumers:
> !image-2024-02-19-13-00-28-306.png|width=537,height=612!
> Things to note from this graphic:
>  * With two consumers, the partitions for a topic all go to a single consumer 
> each
>  * With three consumers, the partitions for a topic are split between two 
> consumers each
>  * With four consumers, the partitions for a topic are split between three 
> consumers each
>  * The total number of messages being processed by each consumer group is 
> very imbalanced throughout the entire period
>  
> With regard to the number of _partitions_ being assigned to each consumer, 
> the group is balanced. However, the assignment appears to be biased so that 
> partitions from the same topic go to the same consumer. In our scenario, this 
> leads to very undesirable partition assignment.
>  
> I question if the behaviour of the assignor should be revised, so that each 
> topic has its partitions maximally spread across all available members of the 
> consumer group. In the above scenario, this would result in much more even 
> distribution of load. The behaviour would then be:
>  * With two consumers, 6 partitions from each topic go to each consumer
>  * With three consumers, 4 partitions from each topic go to each consumer
>  * With four consumers, 3 partitions from each topic go to each consumer
>  
> Of note, we only saw this behaviour after migrating to the 
> `CooperativeStickyAssignor`. It was not an issue with the default partition 
> assignment strategy.
>  
> It is possible this may be intended behaviour. In which case, what is the 
> preferred workaround for our scenario? Our current workaround if we decide to 
> go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
> consumers so they only subscribe to one topic, and have two consumer threads 
> per instance of the application.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-7504 Mitigate sendfile(2) blocking in network threads by waming up fetch data [kafka]

2024-02-18 Thread via GitHub


showuon commented on code in PR #14289:
URL: https://github.com/apache/kafka/pull/14289#discussion_r1493974832


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1408,6 +1408,16 @@ class ReplicaManager(val config: KafkaConfig,
 // progress in such cases and don't need to report a 
`RecordTooLargeException`
 new FetchDataInfo(givenFetchedDataInfo.fetchOffsetMetadata, 
MemoryRecords.EMPTY)
   } else {
+// For last entries we assume that it is hot enough to still have all 
data in page cache.
+// Most of fetch requests are fetching from the tail of the log, so 
this optimization should save
+// call of additional sendfile(2) targeting /dev/null for populating 
page cache significantly.
+if (!givenFetchedDataInfo.isLastSegment && 
givenFetchedDataInfo.records.isInstanceOf[FileRecords]) {
+  try {
+
givenFetchedDataInfo.records.asInstanceOf[FileRecords].prepareForRead()
+  } catch {
+case e: Exception => warn("failed to prepare cache for read", e)

Review Comment:
   I'd argue if we need to log as WARN here since if this there's something 
wrong with the prepareForRead, the WARN logs will keep happening, but it won't 
impact the fetch at all, just have possible performance impact. Could we use 
INFO or maybe DEBUG level here? Also, add more info in the log message, maybe:
   
   `Failed to prepare cache for read for performance improvement. This can be 
ignored if the fetch behavior works without any issue.`
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14467:Add a test to validate the replica state after processing the OFFSET_MOVED_TO_TIERED_STORAGE error [kafka]

2024-02-18 Thread via GitHub


github-actions[bot] commented on PR #14652:
URL: https://github.com/apache/kafka/pull/14652#issuecomment-1951633659

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix MetricsTest.testBrokerTopicMetricsBytesInOut assertion [kafka]

2024-02-18 Thread via GitHub


github-actions[bot] commented on PR #14744:
URL: https://github.com/apache/kafka/pull/14744#issuecomment-1951633449

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-18 Thread Cameron Redpath (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cameron Redpath updated KAFKA-16277:

Description: 
Consider the following scenario:

`topic-1`: 12 partitions

`topic-2`: 12 partitions

 

Of note, `topic-1` gets approximately 10 times more messages through it than 
`topic-2`. 

 

Both of these topics are consumed by a single application, single consumer 
group, which scales under load. Each member of the consumer group subscribes to 
both topics. The `partition.assignment.strategy` being used is 
`org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The application 
may start with one consumer. It consumes all partitions from both topics.

 

The problem begins when the application scales up to two consumers. What is 
seen is that all partitions from `topic-1` go to one consumer, and all 
partitions from `topic-2` go to the other consumer. In the case with one topic 
receiving more messages than the other, this results in a very imbalanced group 
where one consumer is receiving 10x the traffic of the other due to partition 
assignment.

 

This is the issue being seen in our cluster at the moment. See this graph of 
the number of messages being processed by each consumer as the group scales 
from one to four consumers:

!image-2024-02-19-13-00-28-306.png|width=537,height=612!

Things to note from this graphic:
 * With two consumers, the partitions for a topic all go to a single consumer 
each
 * With three consumers, the partitions for a topic are split between two 
consumers each
 * With four consumers, the partitions for a topic are split between three 
consumers each
 * The total number of messages being processed by each consumer group is very 
imbalanced throughout the entire period

 

With regard to the number of _partitions_ being assigned to each consumer, the 
group is balanced. However, the assignment appears to be biased so that 
partitions from the same topic go to the same consumer. In our scenario, this 
leads to very undesirable partition assignment.

 

I question if the behaviour of the assignor should be revised, so that each 
topic has its partitions maximally spread across all available members of the 
consumer group. In the above scenario, this would result in much more even 
distribution of load. The behaviour would then be:
 * With two consumers, 6 partitions from each topic go to each consumer
 * With three consumers, 4 partitions from each topic go to each consumer
 * With four consumers, 3 partitions from each topic go to each consumer

 

Of note, we only saw this behaviour after migrating to the 
`CooperativeStickyAssignor`. It was not an issue with the default partition 
assignment strategy.

 

It is possible this may be intended behaviour. In which case, what is the 
preferred workaround for our scenario? Our current workaround if we decide to 
go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
consumers so they only subscribe to one topic, and have two consumer threads 
per instance of the application.  

  was:
Consider the following scenario:

`topic-1`: 12 partitions

`topic-2`: 12 partitions

 

Of note, `topic-1` gets approximately 10 times more messages through it than 
`topic-2`. 

 

Both of these topics are consumed by a single application, single consumer 
group, which scales under load. Each member of the consumer group subscribes to 
both topics. The `partition.assignment.strategy` being used is 
`org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The application 
may start with one consumer. It consumes all partitions from both topics.

 

The problem begins when the application scales up to two consumers. What is 
seen is that all partitions from `topic-1` go to one consumer, and all 
partitions from `topic-2` go to the other consumer. In the case with one topic 
receiving more messages than the other, this results in a very imbalanced group 
where one consumer is receiving 10x the traffic of the other due to partition 
assignment.

 

This is the issue being seen in our cluster at the moment. See this graph of 
the number of messages being processed by each consumer as the group scales 
from one to four consumers:

!image-2024-02-19-13-00-28-306.png|width=537,height=612!

Things to note from this graphic:
 * With two consumers, the partitions for a topic all go to a single consumer 
each
 * With three consumers, the partitions for a topic are split between two 
consumers each
 * With four consumers, the partitions for a topic are split between three 
consumers each
 * The total number of messages being processed by each consumer group is very 
imbalanced throughout the entire period

 

With regard to the number of _partitions_ being assigned to each consumer, the 
group is balanced. However, the assignment appears to be biased so that 
partitions from the same topic go to the same consumer. 

[jira] [Updated] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-18 Thread Cameron Redpath (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cameron Redpath updated KAFKA-16277:

Description: 
Consider the following scenario:

`topic-1`: 12 partitions

`topic-2`: 12 partitions

 

Of note, `topic-1` gets approximately 10 times more messages through it than 
`topic-2`. 

 

Both of these topics are consumed by a single application, single consumer 
group, which scales under load. Each member of the consumer group subscribes to 
both topics. The `partition.assignment.strategy` being used is 
`org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The application 
may start with one consumer. It consumes all partitions from both topics.

 

The problem begins when the application scales up to two consumers. What is 
seen is that all partitions from `topic-1` go to one consumer, and all 
partitions from `topic-2` go to the other consumer. In the case with one topic 
receiving more messages than the other, this results in a very imbalanced group 
where one consumer is receiving 10x the traffic of the other due to partition 
assignment.

 

This is the issue being seen in our cluster at the moment. See this graph of 
the number of messages being processed by each consumer as the group scales 
from one to four consumers:

!image-2024-02-19-13-00-28-306.png|width=537,height=612!

Things to note from this graphic:
 * With two consumers, the partitions for a topic all go to a single consumer 
each
 * With three consumers, the partitions for a topic are split between two 
consumers each
 * With four consumers, the partitions for a topic are split between three 
consumers each
 * The total number of messages being processed by each consumer group is very 
imbalanced throughout the entire period

 

With regard to the number of _partitions_ being assigned to each consumer, the 
group is balanced. However, the assignment appears to be biased so that 
partitions from the same topic go to the same consumer. In our scenario, this 
leads to very bad partition assignment.

 

I question if the behaviour of the assignor should be revised, so that each 
topic has its partitions maximally spread across all available members of the 
consumer group. In the above scenario, this would result in much more even 
distribution of load. The behaviour would then be:
 * With two consumers, 6 partitions from each topic go to each consumer
 * With three consumers, 4 partitions from each topic go to each consumer
 * With four consumers, 3 partitions from each topic go to each consumer

 

Of note, we only saw this behaviour after migrating to the 
`CooperativeStickyAssignor`. It was not an issue with the default partition 
assignment strategy.

 

It is possible this may be intended behaviour. In which case, what is the 
preferred workaround for our scenario? Our current workaround if we decide to 
go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
consumers so they only subscribe to one topic, and have two consumer threads 
per instance of the application.  

  was:
Consider the following scenario:

`topic-1`: 12 partitions

`topic-2`: 12 partitions

 

Of note, `topic-1` gets approximately 10 times more messages through it than 
`topic-2`. 

 

Both of these topics are consumed by a single application, single consumer 
group, which scales under load. Each member of the consumer group subscribes to 
both topics. The `partition.assignment.strategy` being used is 
`org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The application 
may start with one consumer. It consumes all partitions from both topics.

 

The problem begins when the application scales up to two consumers. What is 
seen is that all partitions from `topic-1` go to one consumer, and all 
partitions from `topic-2` go to the other consumer. In the case with one topic 
receiving more messages than the other, this results in a very imbalanced group 
where one consumer is receiving 10x the traffic of the other due to partition 
assignment.

 

This is the issue being seen in our cluster at the moment. See this graph of 
the number of messages being processed by each consumer as the group scales 
from one to four consumers:

!image-2024-02-19-13-00-28-306.png|width=537,height=612!

Things to note from this graphic:
 * With two consumers, the partitions for a topic all go to a single consumer 
each
 * With three consumers, the partitions for a topic are split between two 
consumers each
 * With four consumers, the partitions for a topic are split between three 
consumers each

 

With regard to the number of _partitions_ being assigned to each consumer, the 
group is balanced. However, the assignment appears to be biased so that 
partitions from the same topic go to the same consumer. In our scenario, this 
leads to very bad partition assignment.

 

I question if the behaviour of the assignor should be 

[jira] [Created] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-18 Thread Cameron Redpath (Jira)
Cameron Redpath created KAFKA-16277:
---

 Summary: CooperativeStickyAssignor does not spread topics evenly 
among consumer group
 Key: KAFKA-16277
 URL: https://issues.apache.org/jira/browse/KAFKA-16277
 Project: Kafka
  Issue Type: Bug
Reporter: Cameron Redpath
 Attachments: image-2024-02-19-13-00-28-306.png

Consider the following scenario:

`topic-1`: 12 partitions

`topic-2`: 12 partitions

 

Of note, `topic-1` gets approximately 10 times more messages through it than 
`topic-2`. 

 

Both of these topics are consumed by a single application, single consumer 
group, which scales under load. Each member of the consumer group subscribes to 
both topics. The `partition.assignment.strategy` being used is 
`org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The application 
may start with one consumer. It consumes all partitions from both topics.

 

The problem begins when the application scales up to two consumers. What is 
seen is that all partitions from `topic-1` go to one consumer, and all 
partitions from `topic-2` go to the other consumer. In the case with one topic 
receiving more messages than the other, this results in a very imbalanced group 
where one consumer is receiving 10x the traffic of the other due to partition 
assignment.

 

This is the issue being seen in our cluster at the moment. See this graph of 
the number of messages being processed by each consumer as the group scales 
from one to four consumers:

!image-2024-02-19-13-00-28-306.png|width=537,height=612!

Things to note from this graphic:
 * With two consumers, the partitions for a topic all go to a single consumer 
each
 * With three consumers, the partitions for a topic are split between two 
consumers each
 * With four consumers, the partitions for a topic are split between three 
consumers each

 

With regard to the number of _partitions_ being assigned to each consumer, the 
group is balanced. However, the assignment appears to be biased so that 
partitions from the same topic go to the same consumer. In our scenario, this 
leads to very bad partition assignment.

 

I question if the behaviour of the assignor should be revised, so that each 
topic has its partitions maximally spread across all available members of the 
consumer group. In the above scenario, this would result in much more even 
distribution of load. The behaviour would then be:
 * With two consumers, 6 partitions from each topic go to each consumer
 * With three consumers, 4 partitions from each topic go to each consumer
 * With four consumers, 3 partitions from each topic go to each consumer

 

Of note, we only saw this behaviour after migrating to the 
`CooperativeStickyAssignor`. It was not an issue with the default partition 
assignment strategy.

 

It is possible this may be intended behaviour. In which case, what is the 
preferred workaround for our scenario? Our current workaround if we decide to 
go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
consumers so they only subscribe to one topic, and have two consumer threads 
per instance of the application.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Updating comments to match the code. [kafka]

2024-02-18 Thread via GitHub


appchemist opened a new pull request, #15388:
URL: https://github.com/apache/kafka/pull/15388

   While reading the Consumer code, I found that the code and comments did not 
match.
   I have created a PR to fix these inconsistencies.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7504 Mitigate sendfile(2) blocking in network threads by waming up fetch data [kafka]

2024-02-18 Thread via GitHub


ocadaruma commented on code in PR #14289:
URL: https://github.com/apache/kafka/pull/14289#discussion_r1493877586


##
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##
@@ -421,6 +446,18 @@ private AbstractIterator 
batchIterator(int start) {
 return new RecordBatchIterator<>(inputStream);
 }
 
+/**
+ * Try populating OS page cache with file content
+ */
+public void prepareForRead() throws IOException {
+if (DEVNULL_PATH != null) {
+long size = Math.min(channel.size(), end) - start;
+try (FileChannel devnullChannel = FileChannel.open(DEVNULL_PATH, 
StandardOpenOption.WRITE)) {
+channel.transferTo(start, size, devnullChannel);

Review Comment:
   > do we want to pre-populate the entire content represented by the 
FileRecords
   
   Given that FileRecords here represents a slice of the file adjusted to 
fetch-size, we want to pre-populate the entire content here because even the 
single `writeTo` might read only smaller part, network-thread anyways needs 
entire content.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7504 Mitigate sendfile(2) blocking in network threads by waming up fetch data [kafka]

2024-02-18 Thread via GitHub


ocadaruma commented on code in PR #14289:
URL: https://github.com/apache/kafka/pull/14289#discussion_r1493877586


##
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##
@@ -421,6 +446,18 @@ private AbstractIterator 
batchIterator(int start) {
 return new RecordBatchIterator<>(inputStream);
 }
 
+/**
+ * Try populating OS page cache with file content
+ */
+public void prepareForRead() throws IOException {
+if (DEVNULL_PATH != null) {
+long size = Math.min(channel.size(), end) - start;
+try (FileChannel devnullChannel = FileChannel.open(DEVNULL_PATH, 
StandardOpenOption.WRITE)) {
+channel.transferTo(start, size, devnullChannel);

Review Comment:
   > do we want to pre-populate the entire content represented by the 
FileRecords
   
   Given that FileRecords here represents a slice of the file adjusted to max 
fetch bytes, we want to pre-populate the entire content here because even the 
single `writeTo` might read only smaller part, network-thread anyways needs 
entire content.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7504 Mitigate sendfile(2) blocking in network threads by waming up fetch data [kafka]

2024-02-18 Thread via GitHub


ocadaruma commented on code in PR #14289:
URL: https://github.com/apache/kafka/pull/14289#discussion_r1493875931


##
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##
@@ -421,6 +446,18 @@ private AbstractIterator 
batchIterator(int start) {
 return new RecordBatchIterator<>(inputStream);
 }
 
+/**
+ * Try populating OS page cache with file content
+ */
+public void prepareForRead() throws IOException {
+if (DEVNULL_PATH != null) {
+long size = Math.min(channel.size(), end) - start;

Review Comment:
   Good point. `this.size` should work here. Let me fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join

2024-02-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14049:

Fix Version/s: 3.7.0

> Relax Non Null Requirement for KStreamGlobalKTable Left Join
> 
>
> Key: KAFKA-14049
> URL: https://issues.apache.org/jira/browse/KAFKA-14049
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Saumya Gupta
>Assignee: Florin Akermann
>Priority: Major
> Fix For: 3.7.0
>
>
> Null Values in the Stream for a Left Join would indicate a Tombstone Message 
> that needs to propagated if not actually joined with the GlobalKTable 
> message, hence these messages should not be ignored .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15713) KRaft support in SaslClientsWithInvalidCredentialsTest

2024-02-18 Thread Pavel Pozdeev (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818300#comment-17818300
 ] 

Pavel Pozdeev commented on KAFKA-15713:
---

Can I pick this up?

> KRaft support in SaslClientsWithInvalidCredentialsTest
> --
>
> Key: KAFKA-15713
> URL: https://issues.apache.org/jira/browse/KAFKA-15713
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in SaslClientsWithInvalidCredentialsTest in 
> core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
>  need to be updated to support KRaft
> 125 : def testAclCliWithAuthorizer(): Unit = {
> 130 : def testAclCliWithAdminAPI(): Unit = {
> 186 : def testProducerConsumerCliWithAuthorizer(): Unit = {
> 191 : def testProducerConsumerCliWithAdminAPI(): Unit = {
> 197 : def testAclCliWithClientId(): Unit = {
> 236 : def testAclsOnPrefixedResourcesWithAuthorizer(): Unit = {
> 241 : def testAclsOnPrefixedResourcesWithAdminAPI(): Unit = {
> 268 : def testInvalidAuthorizerProperty(): Unit = {
> 276 : def testPatternTypes(): Unit = {
> Scanned 336 lines. Found 0 KRaft tests out of 9 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15625: Do not flush global state store at each commit [kafka]

2024-02-18 Thread via GitHub


AyoubOm commented on code in PR #15361:
URL: https://github.com/apache/kafka/pull/15361#discussion_r1493740130


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java:
##
@@ -217,21 +215,53 @@ public void shouldFlushStateManagerWithOffsets() {
 final Map expectedOffsets = new HashMap<>();
 expectedOffsets.put(t1, 52L);
 expectedOffsets.put(t2, 100L);
+
 globalStateTask.initialize();
 globalStateTask.update(record(topic1, 1, 51, "foo".getBytes(), 
"foo".getBytes()));
 globalStateTask.flushState();
+
 assertEquals(expectedOffsets, stateMgr.changelogOffsets());
+assertTrue(stateMgr.flushed);
 }
 
 @Test
 public void shouldCheckpointOffsetsWhenStateIsFlushed() {
 final Map expectedOffsets = new HashMap<>();
 expectedOffsets.put(t1, 102L);
 expectedOffsets.put(t2, 100L);
+
 globalStateTask.initialize();
 globalStateTask.update(record(topic1, 1, 101, "foo".getBytes(), 
"foo".getBytes()));
 globalStateTask.flushState();
-assertThat(stateMgr.changelogOffsets(), equalTo(expectedOffsets));
+
+assertEquals(expectedOffsets, stateMgr.changelogOffsets());
+assertTrue(stateMgr.checkpointWritten);
+}
+
+@Test
+public void shouldNotCheckpointIfNotReceivedEnoughRecords() {
+globalStateTask.initialize();
+globalStateTask.update(record(topic1, 1, 9000L, "foo".getBytes(), 
"foo".getBytes()));
+globalStateTask.maybeCheckpoint();
+
+assertEquals(offsets, stateMgr.changelogOffsets());
+assertFalse(stateMgr.flushed);
+assertFalse(stateMgr.checkpointWritten);
+}
+
+@Test
+public void shouldCheckpointIfReceivedEnoughRecords() {
+final Map expectedOffsets = new HashMap<>();
+expectedOffsets.put(t1, 10051L); // t1 advanced with 10.001 records

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org