[GitHub] [kafka] kamalcph commented on pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)

2023-09-04 Thread via GitHub


kamalcph commented on PR #14329:
URL: https://github.com/apache/kafka/pull/14329#issuecomment-1705975965

   Rebased the PR against trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan merged pull request #14324: KAFKA-15424: Make the transaction verification a dynamic configuration

2023-09-04 Thread via GitHub


jolshan merged PR #14324:
URL: https://github.com/apache/kafka/pull/14324


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15309) Add custom error handler to Producer

2023-09-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761924#comment-17761924
 ] 

Matthias J. Sax commented on KAFKA-15309:
-

Sure, the ticket is up for grabs. Note, that we will need a KIP for this to get 
a proper and approved design -> 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] 

Let us know if you have any questions about the KIP process.

> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
> Attachments: KafkaProducerReproducer.java, app.log
>
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggests to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.
> Another example for which a production exception handler could be useful, if 
> a user tries to write into a non-existing topic, which returns a retryable 
> error code; with infinite retries the producer would hang retrying forever. A 
> handler could help to break the infinite retry loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761918#comment-17761918
 ] 

Matthias J. Sax edited comment on KAFKA-15417 at 9/5/23 1:57 AM:
-

I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config

{{\_\_emit.interval.ms.kstreams.outer.join.spurious.results.fix\_\_}}

to zero to see if it resolve the issue?


was (Author: mjsax):
I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config

{{__{_}emit.interval.ms.kstreams.outer.join.spurious.results.fix_{_}_}}

to zero to see if it resolve the issue?

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761918#comment-17761918
 ] 

Matthias J. Sax edited comment on KAFKA-15417 at 9/5/23 1:56 AM:
-

I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config

{{__emit.interval.ms.kstreams.outer.join.spurious.results.fix{{_}}{_}__}}

to zero to see if it resolve the issue?


was (Author: mjsax):
I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config 
`__{_}emit.interval.ms.kstreams.outer.join.spurious.results.fix{_}__` to zero 
to see if it resolves the issue?

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761918#comment-17761918
 ] 

Matthias J. Sax edited comment on KAFKA-15417 at 9/5/23 1:56 AM:
-

I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config

{{__{_}emit.interval.ms.kstreams.outer.join.spurious.results.fix_{_}_}}

to zero to see if it resolve the issue?


was (Author: mjsax):
I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config

{{__emit.interval.ms.kstreams.outer.join.spurious.results.fix{{_}}{_}__}}

to zero to see if it resolve the issue?

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761918#comment-17761918
 ] 

Matthias J. Sax edited comment on KAFKA-15417 at 9/5/23 1:55 AM:
-

I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config 
`__{_}emit.interval.ms.kstreams.outer.join.spurious.results.fix{_}__` to zero 
to see if it resolves the issue?


was (Author: mjsax):
I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config 
`__emit.interval.ms.kstreams.outer.join.spurious.results.fix__` to zero to see 
if it resolves the issue?

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761918#comment-17761918
 ] 

Matthias J. Sax commented on KAFKA-15417:
-

I believe it could be related to an internal "throttling" mechanism that was 
added to improve throughput.

Can you try to set config 
`__emit.interval.ms.kstreams.outer.join.spurious.results.fix__` to zero to see 
if it resolves the issue?

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15383) Replace EasyMock with Mockito for KTableImplTest

2023-09-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15383:

Component/s: streams
 unit tests

> Replace EasyMock with Mockito for KTableImplTest
> 
>
> Key: KAFKA-15383
> URL: https://issues.apache.org/jira/browse/KAFKA-15383
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Fei Xie
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15385) Replace EasyMock with Mockito for AbstractStreamTest

2023-09-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15385:

Component/s: streams
 unit tests

> Replace EasyMock with Mockito for AbstractStreamTest
> 
>
> Key: KAFKA-15385
> URL: https://issues.apache.org/jira/browse/KAFKA-15385
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Fei Xie
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15382) Replace EasyMock with Mockito for KStreamTransformValuesTest

2023-09-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15382:

Component/s: streams
 unit tests

> Replace EasyMock with Mockito for KStreamTransformValuesTest
> 
>
> Key: KAFKA-15382
> URL: https://issues.apache.org/jira/browse/KAFKA-15382
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Fei Xie
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15384) Replace EasyMock with Mockito for KTableTransformValuesTest

2023-09-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15384:

Component/s: streams
 unit tests

> Replace EasyMock with Mockito for KTableTransformValuesTest
> ---
>
> Key: KAFKA-15384
> URL: https://issues.apache.org/jira/browse/KAFKA-15384
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Fei Xie
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd merged pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)

2023-09-04 Thread via GitHub


satishd merged PR #14307:
URL: https://github.com/apache/kafka/pull/14307


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)

2023-09-04 Thread via GitHub


satishd commented on PR #14307:
URL: https://github.com/apache/kafka/pull/14307#issuecomment-1705769550

   There are a couple of unrelated tests failed in Jenkins jobs, merging to 
trunk and 3.6.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14936) Add Grace Period To Stream Table Join

2023-09-04 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson resolved KAFKA-14936.

Resolution: Done

> Add Grace Period To Stream Table Join
> -
>
> Key: KAFKA-14936
> URL: https://issues.apache.org/jira/browse/KAFKA-14936
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
>  Labels: kip, streams
> Fix For: 3.6.0
>
>
> Include the grace period for stream table joins as described in kip 923.
> Also add a rocksDB time based queueing implementation of 
> `TimeOrderedKeyValueBuffer`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14936) Add Grace Period To Stream Table Join

2023-09-04 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson updated KAFKA-14936:
---
Fix Version/s: 3.6.0

> Add Grace Period To Stream Table Join
> -
>
> Key: KAFKA-14936
> URL: https://issues.apache.org/jira/browse/KAFKA-14936
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
>  Labels: kip, streams
> Fix For: 3.6.0
>
>
> Include the grace period for stream table joins as described in kip 923.
> Also add a rocksDB time based queueing implementation of 
> `TimeOrderedKeyValueBuffer`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-09-04 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1315178928


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java:
##
@@ -0,0 +1,1070 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static 
org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class OptimizedUniformAssignmentBuilderTest {
+private final UniformAssignor assignor = new UniformAssignor();
+private final Uuid topic1Uuid = Uuid.fromString("T1-A4s3VTwiI5CTbEp6POw");
+private final Uuid topic2Uuid = Uuid.fromString("T2-B4s3VTwiI5YHbPp6YUe");
+private final Uuid topic3Uuid = Uuid.fromString("T3-CU8fVTLCz5YMkLoDQsa");
+private final String topic1Name = "topic1";
+private final String topic2Name = "topic2";
+private final String topic3Name = "topic3";
+private final String memberA = "A";
+private final String memberB = "B";
+private final String memberC = "C";
+
+@Test
+public void testOneMemberNoTopicSubscription() {
+SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(
+Collections.singletonMap(
+topic1Uuid,
+new TopicMetadata(
+topic1Uuid,
+topic1Name,
+3,
+mkMapOfPartitionRacks(3)
+)
+)
+);
+
+Map members = Collections.singletonMap(
+memberA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.emptyList(),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+assertEquals(Collections.emptyMap(), groupAssignment.members());
+}
+
+@Test
+public void testOneMemberSubscribedToNonexistentTopic() {
+SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(
+Collections.singletonMap(
+topic1Uuid,
+new TopicMetadata(
+topic1Uuid,
+topic1Name,
+3,
+mkMapOfPartitionRacks(3)
+)
+)
+);
+
+Map members = Collections.singletonMap(
+memberA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.singletonList(topic2Uuid),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+assertEquals(Collections.emptyMap(), groupAssignment.members());
+}
+
+@Test
+public void 
testFirstAssignmentTwoMembersSubscribedToTwoTopicsNoMemberRacks() {
+Map topicMetadata = new HashMap<>();
+topicMetadata.put(topic1Uuid, new TopicMetadata(
+topic1Uuid,
+topic1Name,
+3,
+mkMapOfPartitionRacks(3)
+));
+

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-09-04 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1315178781


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java:
##
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * The Uniform Assignor distributes Kafka topic partitions among group members 
for balanced assignment.
+ * The assignor employs two different strategies based on the nature of topic
+ * subscriptions across the group members:
+ * 
+ * 
+ *  Optimized Uniform Assignment Builder:  This strategy is 
used when all members have subscribed
+ * to the same set of topics.
+ * 
+ * 
+ *  General Uniform Assignment Builder:  This strategy is used 
when members have varied topic
+ * subscriptions.
+ * 
+ * 
+ *
+ * The appropriate strategy is automatically chosen based on the current 
members' topic subscriptions.
+ *
+ * @see OptimizedUniformAssignmentBuilder
+ * @see GeneralUniformAssignmentBuilder
+ */
+public class UniformAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(UniformAssignor.class);
+public static final String UNIFORM_ASSIGNOR_NAME = "uniform";
+
+@Override
+public String name() {
+return UNIFORM_ASSIGNOR_NAME;
+}
+
+/**
+ * Perform the group assignment given the current members and
+ * topic metadata.
+ *
+ * @param assignmentSpecThe member assignment spec.
+ * @param subscribedTopicDescriber  The topic and cluster metadata 
describer {@link SubscribedTopicDescriber}.
+ * @return The new assignment for the group.
+ */
+@Override
+public GroupAssignment assign(
+AssignmentSpec assignmentSpec,
+SubscribedTopicDescriber subscribedTopicDescriber
+) throws PartitionAssignorException {
+
+AbstractAssignmentBuilder assignmentBuilder;
+if (allSubscriptionsEqual(assignmentSpec.members())) {
+log.debug("Detected that all members are subscribed to the same 
set of topics, invoking the "
++ "optimized assignment algorithm");
+assignmentBuilder = new 
OptimizedUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber);
+} else {
+assignmentBuilder = new GeneralUniformAssignmentBuilder();
+log.debug("Detected that all members are subscribed to a different 
set of topics, invoking the "
++ "general assignment algorithm");
+}
+return assignmentBuilder.buildAssignment();
+}
+
+/**
+ * Determines if all members are subscribed to the same list of topic IDs.
+ *
+ * @param members A map of member identifiers to their respective {@code 
AssignmentMemberSpec}.
+ *Assumes the map is non-empty.
+ * @return true if all members have the same subscription list of topic 
IDs,
+ * false otherwise.
+ */
+private boolean allSubscriptionsEqual(Map 
members) {
+Set firstSubscriptionSet = new 
HashSet<>(members.values().iterator().next().subscribedTopicIds());

Review Comment:
   added check but I don't think we need to throw an illegal state exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-09-04 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1315178560


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns Kafka partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ * The order of priority of properties during the assignment will be: balance 
> rack matching (when applicable) > stickiness.
+ *
+ *  Here's the step-by-step breakdown of the assignment process:
+ *
+ * 
+ *  Compute the quotas of partitions for each member based on the 
total partitions and member count.
+ *  For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.
+ *  If a partition's rack mismatches with its member, track it with 
its prior owner.
+ *  Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.
+ *  Derive the unassigned partitions by taking the difference between 
total partitions and the sticky assignments.
+ *  Depending on members needing extra partitions, select members from 
the potentially unfilled list and add them to the unfilled list.
+ *  Proceed with a round-robin assignment adhering to rack awareness.
+ *  For each unassigned partition, locate the first compatible member 
from the unfilled list.
+ *  If no rack-compatible member is found, revert to the tracked 
current owner.
+ *  If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.
+ * 
+ */
+public class OptimizedUniformAssignmentBuilder extends 
UniformAssignor.AbstractAssignmentBuilder {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+private final AssignmentSpec assignmentSpec;
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+// List of topics subscribed to by all members.
+private final List subscriptionList;
+private final RackInfo rackInfo;
+// Count of members to receive an extra partition beyond the minimum quota,
+// to account for the distribution of the remaining partitions.
+private int remainingMembersToGetExtraPartition;
+// Map of members to the remaining number of partitions needed to meet the 
minimum quota,
+// including members eligible for an extra partition.
+private final Map potentiallyUnfilledMembers;
+// Members mapped to the remaining number of partitions needed to meet the 
full quota.
+// Full quota = minQuota + one extra partition (if applicable).
+private Map unfilledMembers;
+private List unassignedPartitions;
+private final Map newAssignment;
+// Tracks the current owner of each partition when using rack-aware 
strategy.
+// Current refers to the existing assignment.
+private final Map currentPartitionOwners;
+// Indicates if a rack aware assignment can be done.
+// True if racks are defined for both members and partitions.
+boolean useRackAwareStrategy;
+
+OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.assignmentSpec = assignmentSpec;
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+subscriptionList = new 
ArrayList<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
+
+RackInfo rackInfo = new RackInfo(assignmentSpec, 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-09-04 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1315178473


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns Kafka partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ * The order of priority of properties during the assignment will be: balance 
> rack matching (when applicable) > stickiness.
+ *
+ *  Here's the step-by-step breakdown of the assignment process:
+ *
+ * 
+ *  Compute the quotas of partitions for each member based on the 
total partitions and member count.
+ *  For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.
+ *  If a partition's rack mismatches with its member, track it with 
its prior owner.
+ *  Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.
+ *  Derive the unassigned partitions by taking the difference between 
total partitions and the sticky assignments.
+ *  Depending on members needing extra partitions, select members from 
the potentially unfilled list and add them to the unfilled list.
+ *  Proceed with a round-robin assignment adhering to rack awareness.
+ *  For each unassigned partition, locate the first compatible member 
from the unfilled list.
+ *  If no rack-compatible member is found, revert to the tracked 
current owner.
+ *  If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.
+ * 
+ */
+public class OptimizedUniformAssignmentBuilder extends 
UniformAssignor.AbstractAssignmentBuilder {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+private final AssignmentSpec assignmentSpec;
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+// List of topics subscribed to by all members.
+private final List subscriptionList;
+private final RackInfo rackInfo;
+// Count of members to receive an extra partition beyond the minimum quota,
+// to account for the distribution of the remaining partitions.
+private int remainingMembersToGetExtraPartition;
+// Map of members to the remaining number of partitions needed to meet the 
minimum quota,
+// including members eligible for an extra partition.
+private final Map potentiallyUnfilledMembers;
+// Members mapped to the remaining number of partitions needed to meet the 
full quota.
+// Full quota = minQuota + one extra partition (if applicable).
+private Map unfilledMembers;
+private List unassignedPartitions;
+private final Map newAssignment;
+// Tracks the current owner of each partition when using rack-aware 
strategy.
+// Current refers to the existing assignment.
+private final Map currentPartitionOwners;
+// Indicates if a rack aware assignment can be done.
+// True if racks are defined for both members and partitions.
+boolean useRackAwareStrategy;
+
+OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.assignmentSpec = assignmentSpec;
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+subscriptionList = new 
ArrayList<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());

Review Comment:
   as discussed on call, I added a check in the 

[GitHub] [kafka] kamalcph commented on a diff in pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)

2023-09-04 Thread via GitHub


kamalcph commented on code in PR #14329:
URL: https://github.com/apache/kafka/pull/14329#discussion_r1315134679


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -379,9 +379,9 @@ public void stopPartitions(Set 
topicPartitions,
 LOGGER.error("Error while stopping the partition: {}, delete: 
{}", tpId.topicPartition(), delete, ex);
 }
 });
-remoteLogMetadataManager.onStopPartitions(topicIdPartitions);
 if (delete) {
 // NOTE: this#stopPartitions method is called when Replica state 
changes to Offline and ReplicaDeletionStarted
+remoteLogMetadataManager.onStopPartitions(topicIdPartitions);

Review Comment:
   yes, you're right. When the replica is moved to another node, then the 
RLMM#stopPartition won't be called. We can handle this case in TBRLMM by adding 
a test to ensure that if the same replica is reverted back to the previous 
node, then it gets handled gracefully. But, we need to handle this case for any 
RLMM implementation.
   
   Another way to fix this issue is that we can always invoke the 
`RLMM#stopPartition` when `deleteLocalLog` is set to true but that requires 
good amount of changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)

2023-09-04 Thread via GitHub


kamalcph commented on code in PR #14329:
URL: https://github.com/apache/kafka/pull/14329#discussion_r1315134679


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -379,9 +379,9 @@ public void stopPartitions(Set 
topicPartitions,
 LOGGER.error("Error while stopping the partition: {}, delete: 
{}", tpId.topicPartition(), delete, ex);
 }
 });
-remoteLogMetadataManager.onStopPartitions(topicIdPartitions);
 if (delete) {
 // NOTE: this#stopPartitions method is called when Replica state 
changes to Offline and ReplicaDeletionStarted
+remoteLogMetadataManager.onStopPartitions(topicIdPartitions);

Review Comment:
   yes, you're right. When the replica is moved to another node, then the 
RLMM#stopPartition won't be called. We can handle this case in TBRLMM by adding 
a test to ensure that if the same replica is reverted back to the previous 
node, then it gets handled gracefully. But, we need to handle this case for any 
RLMM implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.

2023-09-04 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15388:
-
Issue Type: Bug  (was: Task)

> Handle topics that were having compaction as retention earlier are changed to 
> delete only retention policy and onboarded to tiered storage. 
> 
>
> Key: KAFKA-15388
> URL: https://issues.apache.org/jira/browse/KAFKA-15388
> Project: Kafka
>  Issue Type: Bug
>Reporter: Satish Duggana
>Priority: Major
> Fix For: 3.7.0
>
>
> Context: https://github.com/apache/kafka/pull/13561#discussion_r1300055517



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15420) Kafka Tiered Storage V1

2023-09-04 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15420:
-
Labels: KIP-405  (was: )

> Kafka Tiered Storage V1
> ---
>
> Key: KAFKA-15420
> URL: https://issues.apache.org/jira/browse/KAFKA-15420
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.

2023-09-04 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761859#comment-17761859
 ] 

Divij Vaidya commented on KAFKA-15388:
--

Hey [~satish.duggana] 
If supporting TS on historically compacted topics is not in scope for 3.6 
(blocked by this ticket) then we should update the 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Tiered+Storage+Early+Access+Release+Notes]
 with this information. What do you think?

Also, note that other that delete, other paths (read) is impacted by it as 
well. For example, this line of code has an assumption that offsets are 
contiguous 
https://github.com/apache/kafka/blob/5785796f985aa294c12e670da221d086a7fa887c/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1326C4-L1326C4

> Handle topics that were having compaction as retention earlier are changed to 
> delete only retention policy and onboarded to tiered storage. 
> 
>
> Key: KAFKA-15388
> URL: https://issues.apache.org/jira/browse/KAFKA-15388
> Project: Kafka
>  Issue Type: Task
>Reporter: Satish Duggana
>Priority: Major
> Fix For: 3.7.0
>
>
> Context: https://github.com/apache/kafka/pull/13561#discussion_r1300055517



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)

2023-09-04 Thread via GitHub


divijvaidya commented on code in PR #14329:
URL: https://github.com/apache/kafka/pull/14329#discussion_r1315061717


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -379,9 +379,9 @@ public void stopPartitions(Set 
topicPartitions,
 LOGGER.error("Error while stopping the partition: {}, delete: 
{}", tpId.topicPartition(), delete, ex);
 }
 });
-remoteLogMetadataManager.onStopPartitions(topicIdPartitions);
 if (delete) {
 // NOTE: this#stopPartitions method is called when Replica state 
changes to Offline and ReplicaDeletionStarted
+remoteLogMetadataManager.onStopPartitions(topicIdPartitions);

Review Comment:
   > Stop partition request will be called when a topic is deleted, partition 
is moved to another replica and node is stopped. 
   
   The 4th case in future is going to be when TS is disabled dynamically for a 
topic (KIP-950). In such a scenario, TBRLMM will probably still have a problem 
if the disablement has a "retain=delete". We can solve it in scope for KIP-950 
but heads-up that some changes in TBRLMM will be required.
   
   > In the first 2 cases, the request will have delete flag set to true.
   
   When replica is moving to another node, why would `deleteRemoteLog` be true? 
Yes, the `deleteLocalLog` will be true but not `deleteRemoteLog`. Please help 
me understand this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)

2023-09-04 Thread via GitHub


divijvaidya commented on code in PR #14329:
URL: https://github.com/apache/kafka/pull/14329#discussion_r1314961838


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##
@@ -438,10 +440,19 @@ private void initializeResources() {
 lock.writeLock().unlock();
 }
 }
+} finally {
+if (adminClient != null) {
+try {
+adminClient.close(Duration.ofSeconds(10));

Review Comment:
   Use `Utils.closeQuietly` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)

2023-09-04 Thread via GitHub


divijvaidya commented on code in PR #14307:
URL: https://github.com/apache/kafka/pull/14307#discussion_r1315045974


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class EnableRemoteLogOnTopicTest extends TieredStorageTestHarness 
{
+
+@Override
+public int brokerCount() {
+return 2;
+}
+
+@Override
+protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+final Integer broker0 = 0;
+final Integer broker1 = 1;
+final String topicA = "topicA";
+final Integer p0 = 0;
+final Integer p1 = 1;
+final Integer partitionCount = 2;
+final Integer replicationFactor = 2;
+final Integer maxBatchCountPerSegment = 1;
+final boolean enableRemoteLogStorage = false;
+final Map> assignment = mkMap(
+mkEntry(p0, Arrays.asList(broker0, broker1)),
+mkEntry(p1, Arrays.asList(broker1, broker0))
+);
+
+builder
+.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, assignment,
+enableRemoteLogStorage)
+// send records to partition 0
+.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 0L)
+.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+// send records to partition 1
+.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 0L)
+.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+// enable remote log storage
+.updateTopicConfig(topicA,
+
Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
+Collections.emptyList())
+// produce some more records to partition 0
+// Note that the segment 0-2 gets offloaded for p0, but we 
cannot expect those events deterministically
+// because the rlm-task-thread runs in background and this 
framework doesn't support it.
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new 
KeyValueSpec("k3", "v3"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L)
+.produce(topicA, p0, new KeyValueSpec("k3", "v3"), new 
KeyValueSpec("k4", "v4"))
+// produce some more records to partition 1
+// Note that the segment 0-2 gets offloaded for p1, but we 
cannot expect those events deterministically
+// because the rlm-task-thread runs in background and this 
framework doesn't support it.
+.expectSegmentToBeOffloaded(broker1, topicA, p1, 3, new 
KeyValueSpec("k3", "v3"))

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15431) Add support to assert offloaded segment for already produced event in Tiered Storage Framework

2023-09-04 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15431:
-
Parent: (was: KAFKA-15420)
Issue Type: Task  (was: Sub-task)

> Add support to assert offloaded segment for already produced event in Tiered 
> Storage Framework
> --
>
> Key: KAFKA-15431
> URL: https://issues.apache.org/jira/browse/KAFKA-15431
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> See 
> [comment|https://github.com/apache/kafka/pull/14307#discussion_r1314943942]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)

2023-09-04 Thread via GitHub


kamalcph commented on code in PR #14307:
URL: https://github.com/apache/kafka/pull/14307#discussion_r1315003306


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class PartitionsExpandTest extends TieredStorageTestHarness {
+
+@Override
+public int brokerCount() {
+return 2;
+}
+
+@Override
+protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+final Integer broker0 = 0;
+final Integer broker1 = 1;
+final String topicA = "topicA";
+final Integer p0 = 0;
+final Integer p1 = 1;
+final Integer p2 = 2;
+final Integer partitionCount = 1;
+final Integer replicationFactor = 2;
+final Integer maxBatchCountPerSegment = 1;
+final boolean enableRemoteLogStorage = true;
+final List p0Assignment = Arrays.asList(broker0, broker1);
+final List p1Assignment = Arrays.asList(broker0, broker1);
+final List p2Assignment = Arrays.asList(broker1, broker0);
+
+builder
+.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment,
+Collections.singletonMap(p0, p0Assignment), 
enableRemoteLogStorage)
+// produce events to partition 0
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
+.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+// expand the topicA partition-count to 3
+.createPartitions(topicA, 3, mkMap(mkEntry(p1, p1Assignment), 
mkEntry(p2, p2Assignment)))
+// consume from the beginning of the topic to read data from 
local and remote storage for partition 0
+.expectFetchFromTieredStorage(broker0, topicA, p0, 2)
+.consume(topicA, p0, 0L, 3, 2)
+
+.expectLeader(topicA, p1, broker0, false)
+.expectLeader(topicA, p2, broker1, false)
+
+// produce events to partition 1
+.expectSegmentToBeOffloaded(broker0, topicA, p1, 0, new 
KeyValueSpec("k0", "v0"))
+.expectSegmentToBeOffloaded(broker0, topicA, p1, 1, new 
KeyValueSpec("k1", "v1"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 2L)
+.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+
+// produce events to partition 2
+.expectSegmentToBeOffloaded(broker1, topicA, p2, 0, new 
KeyValueSpec("k0", "v0"))
+.expectSegmentToBeOffloaded(broker1, topicA, p2, 1, new 
KeyValueSpec("k1", "v1"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p2, 2L)
+.produce(topicA, p2, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+
+// produce some more events to partition 0 and expect the 
segments to be offloaded
+// NOTE: Support needs to be added to capture the offloaded 
segment event for already sent message (k2, v2)

Review Comment:
   Filed KAFKA-15431 to track this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.


[jira] [Updated] (KAFKA-15431) Add support to assert offloaded segment for already produced event in Tiered Storage Framework

2023-09-04 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15431:
-
Parent: KAFKA-15420
Issue Type: Sub-task  (was: Task)

> Add support to assert offloaded segment for already produced event in Tiered 
> Storage Framework
> --
>
> Key: KAFKA-15431
> URL: https://issues.apache.org/jira/browse/KAFKA-15431
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> See 
> [comment|https://github.com/apache/kafka/pull/14307#discussion_r1314943942]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15431) Add support to assert offloaded segment for already produced event in Tiered Storage Framework

2023-09-04 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-15431:


 Summary: Add support to assert offloaded segment for already 
produced event in Tiered Storage Framework
 Key: KAFKA-15431
 URL: https://issues.apache.org/jira/browse/KAFKA-15431
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash


See [comment|https://github.com/apache/kafka/pull/14307#discussion_r1314943942]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)

2023-09-04 Thread via GitHub


kamalcph commented on code in PR #14307:
URL: https://github.com/apache/kafka/pull/14307#discussion_r1314999348


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class EnableRemoteLogOnTopicTest extends TieredStorageTestHarness 
{
+
+@Override
+public int brokerCount() {
+return 2;
+}
+
+@Override
+protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+final Integer broker0 = 0;
+final Integer broker1 = 1;
+final String topicA = "topicA";
+final Integer p0 = 0;
+final Integer p1 = 1;
+final Integer partitionCount = 2;
+final Integer replicationFactor = 2;
+final Integer maxBatchCountPerSegment = 1;
+final boolean enableRemoteLogStorage = false;
+final Map> assignment = mkMap(
+mkEntry(p0, Arrays.asList(broker0, broker1)),
+mkEntry(p1, Arrays.asList(broker1, broker0))
+);
+
+builder
+.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, assignment,
+enableRemoteLogStorage)
+// send records to partition 0
+.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 0L)
+.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+// send records to partition 1
+.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 0L)
+.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+// enable remote log storage
+.updateTopicConfig(topicA,
+
Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
+Collections.emptyList())
+// produce some more records to partition 0
+// Note that the segment 0-2 gets offloaded for p0, but we 
cannot expect those events deterministically
+// because the rlm-task-thread runs in background and this 
framework doesn't support it.
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new 
KeyValueSpec("k3", "v3"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L)
+.produce(topicA, p0, new KeyValueSpec("k3", "v3"), new 
KeyValueSpec("k4", "v4"))
+// produce some more records to partition 1
+// Note that the segment 0-2 gets offloaded for p1, but we 
cannot expect those events deterministically
+// because the rlm-task-thread runs in background and this 
framework doesn't support it.
+.expectSegmentToBeOffloaded(broker1, topicA, p1, 3, new 
KeyValueSpec("k3", "v3"))

Review Comment:
   The 4th argument is the base-offset of the segment and the 5th argument is 
the contents of that segment. The test asserts that the segment contains only 
the expected messages in it. It is a `varargs` argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)

2023-09-04 Thread via GitHub


kamalcph commented on code in PR #14307:
URL: https://github.com/apache/kafka/pull/14307#discussion_r1314999348


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class EnableRemoteLogOnTopicTest extends TieredStorageTestHarness 
{
+
+@Override
+public int brokerCount() {
+return 2;
+}
+
+@Override
+protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+final Integer broker0 = 0;
+final Integer broker1 = 1;
+final String topicA = "topicA";
+final Integer p0 = 0;
+final Integer p1 = 1;
+final Integer partitionCount = 2;
+final Integer replicationFactor = 2;
+final Integer maxBatchCountPerSegment = 1;
+final boolean enableRemoteLogStorage = false;
+final Map> assignment = mkMap(
+mkEntry(p0, Arrays.asList(broker0, broker1)),
+mkEntry(p1, Arrays.asList(broker1, broker0))
+);
+
+builder
+.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, assignment,
+enableRemoteLogStorage)
+// send records to partition 0
+.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 0L)
+.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+// send records to partition 1
+.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 0L)
+.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+// enable remote log storage
+.updateTopicConfig(topicA,
+
Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
+Collections.emptyList())
+// produce some more records to partition 0
+// Note that the segment 0-2 gets offloaded for p0, but we 
cannot expect those events deterministically
+// because the rlm-task-thread runs in background and this 
framework doesn't support it.
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new 
KeyValueSpec("k3", "v3"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L)
+.produce(topicA, p0, new KeyValueSpec("k3", "v3"), new 
KeyValueSpec("k4", "v4"))
+// produce some more records to partition 1
+// Note that the segment 0-2 gets offloaded for p1, but we 
cannot expect those events deterministically
+// because the rlm-task-thread runs in background and this 
framework doesn't support it.
+.expectSegmentToBeOffloaded(broker1, topicA, p1, 3, new 
KeyValueSpec("k3", "v3"))

Review Comment:
   The 4th parameter is the base-offset of the segment and the 5th parameter is 
the contents of that segment. The test asserts that the segment contains only 
the expected messages in it. It is a `varargs` argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups

2023-09-04 Thread via GitHub


dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1314944364


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -306,6 +308,19 @@ public CoordinatorResult 
commitOffset(
 return offsetMetadataManager.commitOffset(context, request);
 }
 
+/**
+ * Handles a ListGroups request.
+ *
+ * @param statesFilter The states of the groups we want to list. If empty 
all groups are returned with their state.
+ * @return A Result containing the ListGroupsResponseData response
+ */
+public ListGroupsResponseData listGroups(
+List statesFilter,
+long committedOffset
+) throws ApiException {
+return new 
ListGroupsResponseData().setGroups(groupMetadataManager.listGroups(statesFilter,
 committedOffset));
+}

Review Comment:
   I sill think that we should rather return the list of groups here and create 
`ListGroupsResponseData` one level up.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -428,9 +432,51 @@ public CompletableFuture 
listGroups(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+List> futures = new 
ArrayList<>();
+for (TopicPartition tp : runtime.partitions()) {
+futures.add(runtime.scheduleReadOperation(
+"list-groups",
+tp,
+(coordinator, lastCommittedOffset) -> 
coordinator.listGroups(request.statesFilter(), lastCommittedOffset)
+).exceptionally(exception -> {
+if (!(exception instanceof KafkaException)) {
+log.error("ListGroups request {} hit an unexpected 
exception: {}",
+request, exception.getMessage());
+throw new RuntimeException(exception);
+}
+if (exception instanceof CoordinatorLoadInProgressException) {
+throw new RuntimeException(exception);
+} else if (exception instanceof NotCoordinatorException) {
+log.warn("ListGroups request {} hit a 
NotCoordinatorException exception: {}",
+request, exception.getMessage());
+return new 
ListGroupsResponseData().setGroups(Collections.emptyList());
+} else {
+return new 
ListGroupsResponseData().setErrorCode(Errors.forException(exception).code());
+}
+}));
+}
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+List listedGroups = new 
ArrayList<>();
+AtomicInteger succeedFutureCount = new AtomicInteger();
+FutureUtils.drainFutures(futures, (data, t) -> {
+synchronized (runtime) {
+if (t != null) {
+responseFuture.completeExceptionally(new 
UnknownServerException(t.getMessage()));
+} else {
+if (data.errorCode() != Errors.NONE.code()) {
+if (!responseFuture.isDone()) {
+responseFuture.complete(data);
+}
+} else {
+listedGroups.addAll(data.groups());
+if (succeedFutureCount.addAndGet(1) == 
runtime.partitions().size()) {
+responseFuture.complete(new 
ListGroupsResponseData().setGroups(listedGroups));
+}
+}
+}
+}
+});
+return responseFuture;

Review Comment:
   There are a few issues with this code.
   1. Synchronising on `runtime` will create lock contention across all the 
callers of `listGroups`. We should rather use a local variable.
   2. The error handling seems error prone to me. For instance, 
`NotCoordinatorException` exceptions are turned into `RuntimeException` 
exceptions and then turned into `UnknownServerException` if I understood it 
correctly. We lose the semantic along the way.
   
   I think that we could take your idea further and combine the two main steps 
into one. I am thinking about something like this:
   
   ```
   final List partitions = new 
ArrayList<>(runtime.partitions());
   final CompletableFuture future = new 
CompletableFuture<>();
   final List results = new 
ArrayList<>();
   final AtomicInteger cnt = new AtomicInteger(partitions.size());
   
   for (TopicPartition partition : partitions) {
   runtime.scheduleReadOperation(
   "list-group",
   partition,
   (coordinator, lastCommittedOffset) -> 

[GitHub] [kafka] divijvaidya commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)

2023-09-04 Thread via GitHub


divijvaidya commented on code in PR #14307:
URL: https://github.com/apache/kafka/pull/14307#discussion_r1314943942


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class PartitionsExpandTest extends TieredStorageTestHarness {
+
+@Override
+public int brokerCount() {
+return 2;
+}
+
+@Override
+protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+final Integer broker0 = 0;
+final Integer broker1 = 1;
+final String topicA = "topicA";
+final Integer p0 = 0;
+final Integer p1 = 1;
+final Integer p2 = 2;
+final Integer partitionCount = 1;
+final Integer replicationFactor = 2;
+final Integer maxBatchCountPerSegment = 1;
+final boolean enableRemoteLogStorage = true;
+final List p0Assignment = Arrays.asList(broker0, broker1);
+final List p1Assignment = Arrays.asList(broker0, broker1);
+final List p2Assignment = Arrays.asList(broker1, broker0);
+
+builder
+.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment,
+Collections.singletonMap(p0, p0Assignment), 
enableRemoteLogStorage)
+// produce events to partition 0
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
+.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+// expand the topicA partition-count to 3
+.createPartitions(topicA, 3, mkMap(mkEntry(p1, p1Assignment), 
mkEntry(p2, p2Assignment)))
+// consume from the beginning of the topic to read data from 
local and remote storage for partition 0
+.expectFetchFromTieredStorage(broker0, topicA, p0, 2)
+.consume(topicA, p0, 0L, 3, 2)
+
+.expectLeader(topicA, p1, broker0, false)
+.expectLeader(topicA, p2, broker1, false)
+
+// produce events to partition 1
+.expectSegmentToBeOffloaded(broker0, topicA, p1, 0, new 
KeyValueSpec("k0", "v0"))
+.expectSegmentToBeOffloaded(broker0, topicA, p1, 1, new 
KeyValueSpec("k1", "v1"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 2L)
+.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+
+// produce events to partition 2
+.expectSegmentToBeOffloaded(broker1, topicA, p2, 0, new 
KeyValueSpec("k0", "v0"))
+.expectSegmentToBeOffloaded(broker1, topicA, p2, 1, new 
KeyValueSpec("k1", "v1"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p2, 2L)
+.produce(topicA, p2, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+
+// produce some more events to partition 0 and expect the 
segments to be offloaded
+// NOTE: Support needs to be added to capture the offloaded 
segment event for already sent message (k2, v2)

Review Comment:
   we risk forgetting this commented line. please add a JIRA



##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java:
##
@@ -0,0 +1,89 @@

[GitHub] [kafka] dopuskh3 closed pull request #14332: [MINOR] Fix TopicPartition comparison

2023-09-04 Thread via GitHub


dopuskh3 closed pull request #14332: [MINOR] Fix TopicPartition comparison
URL: https://github.com/apache/kafka/pull/14332


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15430) Kafla create replca partition on controller node

2023-09-04 Thread Andrii Vysotskiy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrii Vysotskiy updated KAFKA-15430:
-
Priority: Major  (was: Minor)

> Kafla create replca partition on controller node
> 
>
> Key: KAFKA-15430
> URL: https://issues.apache.org/jira/browse/KAFKA-15430
> Project: Kafka
>  Issue Type: Test
>  Components: kraft
>Affects Versions: 3.5.1
>Reporter: Andrii Vysotskiy
>Priority: Major
>
> I have configuration 5 nodes (KRAFT mode), with next roles: 4 
> broker+controller and 1 controller. Create topic with replication factor 5, 
> and it is created, and describe show that topic partition have 5 replicas.
>  
> {{/opt/kafka/latest/bin/kafka-topics.sh --create 
> --bootstrap-server=dc1-prod-kafka-001-vs:9092 --replication-factor 5 
> --partitions 1 --topic test5}}
>  
> /opt/kafka/latest/bin/kafka-topics.sh --describe --topic test5 
> --bootstrap-server=dc1-prod-kafka-001-vs:9092
> Topic: test5 TopicId: amuqr8EgRmqeKryUHZwsMA PartitionCount: 1 
> ReplicationFactor: 5 Configs: segment.bytes=1073741824
> Topic: test5 Partition: 0 Leader: 3 Replicas: 3,4,1,2,5 Isr: 3,4,1,2}}
>  
> Replicas 5 and ISR 4. Why does kafka initially allow you to create a replica 
> on the controller node, although in reality the replica is not created on the 
> controller node and there are no topic files in the log directory.
> Is this expected behavior or not? Thanks.
> I want to understand whether such behavior is the norm for Kafka
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dengziming commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-04 Thread via GitHub


dengziming commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1314913328


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -320,6 +323,19 @@ public short registerBrokerRecordVersion() {
 }
 }
 
+public short registerControllerRecordVersion() {
+if (isAtLeast(MetadataVersion.IBP_3_6_IV2)) {

Review Comment:
   We are not using `IBP_3_7_IV0`, does this mean a old-version controller can 
register a controller with an new-version controller? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #14332: [MINOR] Fix TopicPartition comparison

2023-09-04 Thread via GitHub


satishd commented on PR #14332:
URL: https://github.com/apache/kafka/pull/14332#issuecomment-1705221588

   Thanks @dopuskh3 for catching and raising the PR. As @showuon mentioned, it 
is addressed by 
[PR](https://github.com/apache/kafka/pull/14307/files#diff-380e4d8859ea9148f21794c09039425c82d9012a392c2dbbe1ce2ec8677a1970R304)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)

2023-09-04 Thread via GitHub


kamalcph commented on code in PR #14307:
URL: https://github.com/apache/kafka/pull/14307#discussion_r1314900757


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+
+public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness {
+
+@Override
+public int brokerCount() {
+return 1;
+}
+
+@Override
+protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+final Integer broker0 = 0;
+final String topicA = "topicA";
+final Integer p0 = 0;
+final Integer partitionCount = 1;
+final Integer replicationFactor = 1;
+final Integer maxBatchCountPerSegment = 1;
+final Map> replicaAssignment = null;
+final boolean enableRemoteLogStorage = true;
+final int beginEpoch = 0;
+final long startOffset = 3;
+
+// Create topicA with 1 partition, 1 RF and enabled with remote 
storage.
+builder.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, replicaAssignment,
+enableRemoteLogStorage)
+// produce events to partition 0 and expect 3 segments to be 
offloaded
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new 
KeyValueSpec("k2", "v2"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
+.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3"))
+// update the topic config such that it triggers the deletion 
of segments
+.updateTopicConfig(topicA, configsToBeAdded(), 
Collections.emptyList())

Review Comment:
   default retention time is 7 days and retention bytes is unlimited so the 
config update is required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dopuskh3 commented on pull request #14332: [MINOR] Fix TopicPartition comparison

2023-09-04 Thread via GitHub


dopuskh3 commented on PR #14332:
URL: https://github.com/apache/kafka/pull/14332#issuecomment-1705208040

   cc: @satishd @jeqo 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dopuskh3 opened a new pull request, #14332: [MINOR] Fix TopicPartition comparison

2023-09-04 Thread via GitHub


dopuskh3 opened a new pull request, #14332:
URL: https://github.com/apache/kafka/pull/14332

   This only affect logging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)

2023-09-04 Thread via GitHub


clolov commented on code in PR #14307:
URL: https://github.com/apache/kafka/pull/14307#discussion_r1314892554


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+
+public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness {
+
+@Override
+public int brokerCount() {
+return 1;
+}
+
+@Override
+protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+final Integer broker0 = 0;
+final String topicA = "topicA";
+final Integer p0 = 0;
+final Integer partitionCount = 1;
+final Integer replicationFactor = 1;
+final Integer maxBatchCountPerSegment = 1;
+final Map> replicaAssignment = null;
+final boolean enableRemoteLogStorage = true;
+final int beginEpoch = 0;
+final long startOffset = 3;
+
+// Create topicA with 1 partition, 1 RF and enabled with remote 
storage.
+builder.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, replicaAssignment,
+enableRemoteLogStorage)
+// produce events to partition 0 and expect 3 segments to be 
offloaded
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new 
KeyValueSpec("k2", "v2"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
+.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3"))
+// update the topic config such that it triggers the deletion 
of segments
+.updateTopicConfig(topicA, configsToBeAdded(), 
Collections.emptyList())

Review Comment:
   For my understanding, why do we need this configuration update? How would it 
trigger the segment deletion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14328: KAFKA-15410: Reassign replica expand, move and shrink integration tests (2/4)

2023-09-04 Thread via GitHub


kamalcph commented on code in PR #14328:
URL: https://github.com/apache/kafka/pull/14328#discussion_r1314897616


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaShrinkTest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class ReassignReplicaShrinkTest extends TieredStorageTestHarness {
+
+/**
+ * Cluster of two brokers
+ * @return number of brokers in the cluster
+ */
+@Override
+public int brokerCount() {
+return 2;
+}
+
+/**
+ * Number of partitions in the '__remote_log_metadata' topic
+ * @return number of partitions in the '__remote_log_metadata' topic
+ */
+@Override
+public int numRemoteLogMetadataPartitions() {
+return 2;
+}
+
+@Override
+protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+final Integer broker0 = 0;
+final Integer broker1 = 1;
+final String topicA = "topicA";
+final Integer p0 = 0;
+final Integer p1 = 1;
+final Integer partitionCount = 2;
+final Integer replicationFactor = 2;
+final Integer maxBatchCountPerSegment = 1;
+final boolean enableRemoteLogStorage = true;
+final Map> replicaAssignment = mkMap(
+mkEntry(p0, Arrays.asList(broker0, broker1)),
+mkEntry(p1, Arrays.asList(broker1, broker0))
+);
+
+builder
+// create topicA with 2 partitions and 2 RF
+.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment,
+replicaAssignment, enableRemoteLogStorage)
+// send records to partition 0, expect that the segments are 
uploaded and removed from local log dir
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
+.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+// send records to partition 1, expect that the segments are 
uploaded and removed from local log dir
+.expectSegmentToBeOffloaded(broker1, topicA, p1, 0, new 
KeyValueSpec("k0", "v0"))
+.expectSegmentToBeOffloaded(broker1, topicA, p1, 1, new 
KeyValueSpec("k1", "v1"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 2L)
+.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+// shrink the replication factor to 1
+.shrinkReplica(topicA, p0, Collections.singletonList(broker1))
+.shrinkReplica(topicA, p1, Collections.singletonList(broker0))
+.expectLeader(topicA, p0, broker1, false)

Review Comment:
   yes, we are asserting the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14328: KAFKA-15410: Reassign replica expand, move and shrink integration tests (2/4)

2023-09-04 Thread via GitHub


kamalcph commented on code in PR #14328:
URL: https://github.com/apache/kafka/pull/14328#discussion_r1314896591


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaExpandTest.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class ReassignReplicaExpandTest extends TieredStorageTestHarness {
+
+protected final Integer broker0 = 0;
+protected final Integer broker1 = 1;
+
+/**
+ * Cluster of two brokers
+ * @return number of brokers in the cluster
+ */
+@Override
+public int brokerCount() {
+return 2;
+}
+
+/**
+ * Number of partitions in the '__remote_log_metadata' topic
+ * @return number of partitions in the '__remote_log_metadata' topic
+ */
+@Override
+public int numRemoteLogMetadataPartitions() {
+return 2;
+}
+
+@Override
+protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+final String topicA = "topicA";
+final String topicB = "topicB";
+final Integer p0 = 0;
+final Integer partitionCount = 5;
+final Integer replicationFactor = 2;
+final Integer maxBatchCountPerSegment = 1;
+final Map> replicaAssignment = null;
+final boolean enableRemoteLogStorage = true;
+final List metadataPartitions = new ArrayList<>();
+for (int i = 0; i < numRemoteLogMetadataPartitions(); i++) {
+metadataPartitions.add(i);
+}
+
+builder
+// create topicA with 5 partitions and 2 RF
+.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment,

Review Comment:
   yes correct. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-09-04 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-14133:
--
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
[~yash.mayya] )
 # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#00875a}TopologyTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
 # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
 # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
 # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
 # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
 # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
 # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: 
Christo)
 # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo)
 # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in 
https://issues.apache.org/jira/browse/KAFKA-12947)
 # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
[~shekharrajak])
 # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) 
[https://github.com/apache/kafka/pull/12777] 
 # {color:#00875a}AbstractStreamTest{color} (owner: Christo)
 # {color:#00875a}KStreamTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KTableImplTest{color} (owner: Christo)
 # {color:#00875a}KTableTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}SessionCacheFlushListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedCacheFlushListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedTupleForwarderTest{color} (owner: Christo)
 # {color:#00875a}ActiveTaskCreatorTest{color} (owner: Christo)
 # {color:#00875a}ChangelogTopicsTest{color} (owner: Christo)
 # {color:#00875a}GlobalProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}RecordCollectorTest{color} (owner: Christo)
 # {color:#00875a}StateRestoreCallbackAdapterTest{color} (owner: Christo)
 # 

[GitHub] [kafka] showuon commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-04 Thread via GitHub


showuon commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1314886326


##
clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationRequest.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ControllerRegistrationRequestData;
+import org.apache.kafka.common.message.ControllerRegistrationResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class ControllerRegistrationRequest extends AbstractRequest {
+public static class Builder extends 
AbstractRequest.Builder {
+private final ControllerRegistrationRequestData data;
+
+public Builder(ControllerRegistrationRequestData data) {
+super(ApiKeys.BROKER_HEARTBEAT);

Review Comment:
   Nice catch, @dengziming !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-04 Thread via GitHub


dengziming commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1314864521


##
clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationRequest.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ControllerRegistrationRequestData;
+import org.apache.kafka.common.message.ControllerRegistrationResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class ControllerRegistrationRequest extends AbstractRequest {
+public static class Builder extends 
AbstractRequest.Builder {
+private final ControllerRegistrationRequestData data;
+
+public Builder(ControllerRegistrationRequestData data) {
+super(ApiKeys.BROKER_HEARTBEAT);

Review Comment:
   this should be CONTROLLER_REGISTRATION



##
clients/src/main/resources/common/message/ControllerRegistrationRequest.json:
##
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 70,

Review Comment:
   we don't have 69, why use 70?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15293) Update metrics doc to add tiered storage metrics

2023-09-04 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15293:
---
Component/s: documentation

> Update metrics doc to add tiered storage metrics
> 
>
> Key: KAFKA-15293
> URL: https://issues.apache.org/jira/browse/KAFKA-15293
> Project: Kafka
>  Issue Type: Sub-task
>  Components: documentation
>Reporter: Abhijeet Kumar
>Assignee: Abhijeet Kumar
>Priority: Critical
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2023-09-04 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761806#comment-17761806
 ] 

Satish Duggana commented on KAFKA-13421:


Moving it to 3.7.0 as there is not much activity on this issues for the last 
few weeks. 

> Fix 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> -
>
> Key: KAFKA-13421
> URL: https://issues.apache.org/jira/browse/KAFKA-13421
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: clients, consumer, flaky-test, unit-test
> Fix For: 3.6.0
>
>
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
>  is failing with this error:
> {code}
> ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup()
>  failed, log available in 
> /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout
>   
>   
> ConsumerBounceTest > 
> testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() 
> FAILED
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode 
> = NodeExists
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126) 
>
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904)
>  
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842)
> at 
> kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809)
> at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:320)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2
> 12)
> at 
> scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
> at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889)
> at 
> kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203)
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB
> igGroup$1(ConsumerBounceTest.scala:327)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
> at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C
> onsumerBounceTest.scala:319) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2023-09-04 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-13421:
---
Fix Version/s: 3.7.0
   (was: 3.6.0)

> Fix 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> -
>
> Key: KAFKA-13421
> URL: https://issues.apache.org/jira/browse/KAFKA-13421
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: clients, consumer, flaky-test, unit-test
> Fix For: 3.7.0
>
>
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
>  is failing with this error:
> {code}
> ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup()
>  failed, log available in 
> /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout
>   
>   
> ConsumerBounceTest > 
> testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() 
> FAILED
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode 
> = NodeExists
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126) 
>
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904)
>  
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842)
> at 
> kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809)
> at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:320)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2
> 12)
> at 
> scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
> at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889)
> at 
> kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203)
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB
> igGroup$1(ConsumerBounceTest.scala:327)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
> at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C
> onsumerBounceTest.scala:319) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)

2023-09-04 Thread via GitHub


showuon commented on PR #14307:
URL: https://github.com/apache/kafka/pull/14307#issuecomment-1705153637

   Will wait for @clolov 's review before merging it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14331: Add documentation for tiered storage metrics

2023-09-04 Thread via GitHub


divijvaidya commented on code in PR #14331:
URL: https://github.com/apache/kafka/pull/14331#discussion_r1314856715


##
docs/ops.html:
##
@@ -1545,6 +1545,51 @@ https://github.com/apache/kafka/blob/d34d84dbef20559e68c899315a0915e9dd740cb0/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java#L64



##
docs/ops.html:
##
@@ -1545,6 +1545,51 @@ 

[GitHub] [kafka] satishd commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)

2023-09-04 Thread via GitHub


satishd commented on code in PR #14307:
URL: https://github.com/apache/kafka/pull/14307#discussion_r1314858032


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class PartitionsExpandTest extends TieredStorageTestHarness {
+
+@Override
+public int brokerCount() {
+return 2;
+}
+
+@Override
+protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+final Integer broker0 = 0;
+final Integer broker1 = 1;
+final String topicA = "topicA";
+final Integer p0 = 0;
+final Integer p1 = 1;
+final Integer p2 = 2;
+final Integer partitionCount = 1;
+final Integer replicationFactor = 2;
+final Integer maxBatchCountPerSegment = 1;
+final boolean enableRemoteLogStorage = true;
+final List p0Assignment = Arrays.asList(broker0, broker1);
+final List p1Assignment = Arrays.asList(broker0, broker1);
+final List p2Assignment = Arrays.asList(broker1, broker0);
+
+builder
+.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment,
+Collections.singletonMap(p0, p0Assignment), 
enableRemoteLogStorage)
+// produce events to partition 0
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
+.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+// expand the topicA partition-count to 3
+.createPartitions(topicA, 3, mkMap(mkEntry(p1, p1Assignment), 
mkEntry(p2, p2Assignment)))
+// consume from the beginning of the topic to read data from 
local and remote storage for partition 0
+.expectFetchFromTieredStorage(broker0, topicA, p0, 2)
+.consume(topicA, p0, 0L, 3, 2)
+
+.expectLeader(topicA, p1, broker0, false)
+.expectLeader(topicA, p2, broker1, false)
+
+// produce events to partition 1
+.expectSegmentToBeOffloaded(broker0, topicA, p1, 0, new 
KeyValueSpec("k0", "v0"))

Review Comment:
   Sure, we can look into that later. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #14152: KAFKA-7438: Migrate WindowStoreBuilderTest from EasyMock to Mockito

2023-09-04 Thread via GitHub


yashmayya commented on PR #14152:
URL: https://github.com/apache/kafka/pull/14152#issuecomment-1705144816

   Thanks Divij and no worries, this was pretty low priority! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)

2023-09-04 Thread via GitHub


kamalcph commented on code in PR #14307:
URL: https://github.com/apache/kafka/pull/14307#discussion_r1314851919


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java:
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class PartitionsExpandTest extends TieredStorageTestHarness {
+
+@Override
+public int brokerCount() {
+return 2;
+}
+
+@Override
+protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+final Integer broker0 = 0;
+final Integer broker1 = 1;
+final String topicA = "topicA";
+final Integer p0 = 0;
+final Integer p1 = 1;
+final Integer p2 = 2;
+final Integer partitionCount = 1;
+final Integer replicationFactor = 2;
+final Integer maxBatchCountPerSegment = 1;
+final boolean enableRemoteLogStorage = true;
+final List p0Assignment = Arrays.asList(broker0, broker1);
+final List p1Assignment = Arrays.asList(broker0, broker1);
+final List p2Assignment = Arrays.asList(broker1, broker0);
+
+builder
+.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment,
+Collections.singletonMap(p0, p0Assignment), 
enableRemoteLogStorage)
+// produce events to partition 0
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
+.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"))
+// expand the topicA partition-count to 3
+.createPartitions(topicA, 3, mkMap(mkEntry(p1, p1Assignment), 
mkEntry(p2, p2Assignment)))
+// consume from the beginning of the topic to read data from 
local and remote storage for partition 0
+.expectFetchFromTieredStorage(broker0, topicA, p0, 2)
+.consume(topicA, p0, 0L, 3, 2)
+
+.expectLeader(topicA, p1, broker0, false)
+.expectLeader(topicA, p2, broker1, false)
+
+// produce events to partition 1
+.expectSegmentToBeOffloaded(broker0, topicA, p1, 0, new 
KeyValueSpec("k0", "v0"))

Review Comment:
   Prefer to keep the format as it is. We can refactor it later if required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #14152: KAFKA-7438: Migrate WindowStoreBuilderTest from EasyMock to Mockito

2023-09-04 Thread via GitHub


divijvaidya merged PR #14152:
URL: https://github.com/apache/kafka/pull/14152


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14152: KAFKA-7438: Migrate WindowStoreBuilderTest from EasyMock to Mockito

2023-09-04 Thread via GitHub


divijvaidya commented on PR #14152:
URL: https://github.com/apache/kafka/pull/14152#issuecomment-1705136079

   I have verified that that the test passes in CI 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14152/1/testReport/org.apache.kafka.streams.state.internals/WindowStoreBuilderTest/
 and rest of the failures are flaky/unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14331: Add documentation for tiered storage metrics

2023-09-04 Thread via GitHub


abhijeetk88 commented on code in PR #14331:
URL: https://github.com/apache/kafka/pull/14331#discussion_r1314820039


##
docs/ops.html:
##
@@ -1545,6 +1545,51 @@ 

[GitHub] [kafka] satishd commented on a diff in pull request #14331: Add documentation for tiered storage metrics

2023-09-04 Thread via GitHub


satishd commented on code in PR #14331:
URL: https://github.com/apache/kafka/pull/14331#discussion_r1314813270


##
docs/ops.html:
##
@@ -1545,6 +1545,51 @@ 

[GitHub] [kafka] divijvaidya merged pull request #14223: KAFKA-14133: Move AbstractStreamTest and RocksDBMetricsRecordingTriggerTest to Mockito

2023-09-04 Thread via GitHub


divijvaidya merged PR #14223:
URL: https://github.com/apache/kafka/pull/14223


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15352) Ensure consistency while deleting the remote log segments

2023-09-04 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15352:
-
Description: 
In Kafka-14888, the remote log segments are deleted which breaches the 
retention time/size before updating the log-start-offset. In middle of 
deletion, if the consumer starts to read from the beginning of the topic, then 
it will fail to read the messages and UNKNOWN_SERVER_ERROR will be thrown back 
to the consumer.

To ensure consistency, similar to local log segments where the actual segments 
are deleted after {{{}segment.delete.delay.ms{}}}, we should update the 
log-start-offset first before deleting the remote log segment.

See the [PR#13561|https://github.com/apache/kafka/pull/13561] and 
[comment|https://github.com/apache/kafka/pull/13561#discussion_r1293086543] for 
more details.

Case-2:
The old-leader (follower) can delete the remote log segment in the middle of 
leader election. We need to update the log-start-offset metadata for this case.
See this comment 
[https://github.com/apache/kafka/pull/13561#discussion_r1293081560]

  was:
In Kafka-14888, the remote log segments are deleted which breaches the 
retention time/size before updating the log-start-offset. In middle of 
deletion, if the consumer starts to read from the beginning of the topic, then 
it will fail to read the messages and UNKNOWN_SERVER_ERROR will be thrown back 
to the consumer.

To ensure consistency, similar to local log segments where the actual segments 
are deleted after {{segment.delete.delay.ms}}, we should update the 
log-start-offset first before deleting the remote log segment.

See the [PR#13561|https://github.com/apache/kafka/pull/13561] and 
[comment|https://github.com/apache/kafka/pull/13561#discussion_r1293086543] for 
more details.


> Ensure consistency while deleting the remote log segments
> -
>
> Key: KAFKA-15352
> URL: https://issues.apache.org/jira/browse/KAFKA-15352
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Blocker
> Fix For: 3.6.0
>
>
> In Kafka-14888, the remote log segments are deleted which breaches the 
> retention time/size before updating the log-start-offset. In middle of 
> deletion, if the consumer starts to read from the beginning of the topic, 
> then it will fail to read the messages and UNKNOWN_SERVER_ERROR will be 
> thrown back to the consumer.
> To ensure consistency, similar to local log segments where the actual 
> segments are deleted after {{{}segment.delete.delay.ms{}}}, we should update 
> the log-start-offset first before deleting the remote log segment.
> See the [PR#13561|https://github.com/apache/kafka/pull/13561] and 
> [comment|https://github.com/apache/kafka/pull/13561#discussion_r1293086543] 
> for more details.
> Case-2:
> The old-leader (follower) can delete the remote log segment in the middle of 
> leader election. We need to update the log-start-offset metadata for this 
> case.
> See this comment 
> [https://github.com/apache/kafka/pull/13561#discussion_r1293081560]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15351) Update log-start-offset after leader election for topics enabled with remote storage

2023-09-04 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15351:
-
Description: 
Case-1:

In the FETCH response, the leader-log-start-offset will be piggy-backed. But, 
there can be a scenario:
 # Leader deleted the remote log segment and updates it's log-start-offset
 # Before the replica-2 update it's log-start-offset via FETCH-request, the 
leadership changed to replica-2.
 # There are no more eligible segments to delete from remote.
 # The log-start-offset will be stale (referring to old log-start-offset but 
the data was already removed from remote)
 # If the consumer starts to read from the beginning of the topic, it will fail 
to read.
See this comment 
[https://github.com/apache/kafka/pull/13561#discussion_r1293081560] for more 
details.

Case-3: 

When tiered storage is enabled on the topic, and the last-standing-replica is 
restarted, then the log-start-offset should be updated upto 
log-start-offset-checkpoint offset.

  was:
Case-1:

In the FETCH response, the leader-log-start-offset will be piggy-backed. But, 
there can be a scenario:
 # Leader deleted the remote log segment and updates it's log-start-offset
 # Before the replica-2 update it's log-start-offset via FETCH-request, the 
leadership changed to replica-2.
 # There are no more eligible segments to delete from remote.
 # The log-start-offset will be stale (referring to old log-start-offset but 
the data was already removed from remote)
 # If the consumer starts to read from the beginning of the topic, it will fail 
to read.
See this comment 
[https://github.com/apache/kafka/pull/13561#discussion_r1293081560] for more 
details.

Case-2:
The old-leader (follower) can delete the remote log segment in the middle of 
leader election. We need to update the log-start-offset metadata for this case.

Case-3: 

When tiered storage is enabled on the topic, and the last-standing-replica is 
restarted, then the log-start-offset should be updated upto 
log-start-offset-checkpoint offset.


> Update log-start-offset after leader election for topics enabled with remote 
> storage
> 
>
> Key: KAFKA-15351
> URL: https://issues.apache.org/jira/browse/KAFKA-15351
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Blocker
> Fix For: 3.6.0
>
>
> Case-1:
> In the FETCH response, the leader-log-start-offset will be piggy-backed. But, 
> there can be a scenario:
>  # Leader deleted the remote log segment and updates it's log-start-offset
>  # Before the replica-2 update it's log-start-offset via FETCH-request, the 
> leadership changed to replica-2.
>  # There are no more eligible segments to delete from remote.
>  # The log-start-offset will be stale (referring to old log-start-offset but 
> the data was already removed from remote)
>  # If the consumer starts to read from the beginning of the topic, it will 
> fail to read.
> See this comment 
> [https://github.com/apache/kafka/pull/13561#discussion_r1293081560] for more 
> details.
> Case-3: 
> When tiered storage is enabled on the topic, and the last-standing-replica is 
> restarted, then the log-start-offset should be updated upto 
> log-start-offset-checkpoint offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on a diff in pull request #14330: KAFKA-15410: Delete records with tiered storage integration test (4/4)

2023-09-04 Thread via GitHub


kamalcph commented on code in PR #14330:
URL: https://github.com/apache/kafka/pull/14330#discussion_r1314697994


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1066,50 +1066,53 @@ public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
 Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
+
+// FIXME: We have to remove the below check too for `DELETE_RECORDS` 
API to work properly.

Review Comment:
   Not clear on the usage of `isRemoteSegmentWithinLeaderEpochs` validation in 
the segment deletion path. Commented out the code for now, will go through the 
code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14322: KAFKA-15418: update statement on decompression

2023-09-04 Thread via GitHub


divijvaidya commented on code in PR #14322:
URL: https://github.com/apache/kafka/pull/14322#discussion_r1314750361


##
docs/design.html:
##
@@ -136,8 +136,10 @@ 
-Kafka supports this with an efficient batching format. A batch of messages 
can be clumped together compressed and sent to the server in this form. This 
batch of messages will be written in compressed form and will
-remain compressed in the log and will only be decompressed by the consumer.
+Kafka supports this with an efficient batching format. A batch of messages 
can be grouped together, compressed, and sent to the server in this form. The 
broker decompresses the batch in order to validate it. For
+example, it validates that the number of records in the batch is same as 
what batch header states. The broker may also potentially modify the batch 
(e.g., if the topic is compacted, the broker will filter out 

Review Comment:
   I just realised another thing.
   
   "if the topic is compacted, the broker will filter out records eligible for 
compaction prior to writing to disk"
   
   Are you referring to the fact that records written to a compacted topic need 
to necessarily have a non-null key else they will be rejected? If yes, then 
perhaps, we need to phrase it differently.
   
   Let me get back to you with a suggestion here in a couple of hours.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14936) Add Grace Period To Stream Table Join

2023-09-04 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761766#comment-17761766
 ] 

Satish Duggana commented on KAFKA-14936:


[~wcarlson5] Is this completed for 3.6.0? If yes, please close the JIRA by 
updating the respective fix version fields.

> Add Grace Period To Stream Table Join
> -
>
> Key: KAFKA-14936
> URL: https://issues.apache.org/jira/browse/KAFKA-14936
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
>  Labels: kip, streams
>
> Include the grace period for stream table joins as described in kip 923.
> Also add a rocksDB time based queueing implementation of 
> `TimeOrderedKeyValueBuffer`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on a diff in pull request #14330: KAFKA-15410: Delete records with tiered storage integration test (4/4)

2023-09-04 Thread via GitHub


kamalcph commented on code in PR #14330:
URL: https://github.com/apache/kafka/pull/14330#discussion_r1314697994


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1066,50 +1066,53 @@ public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
 Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
+
+// FIXME: We have to remove the below check too for `DELETE_RECORDS` 
API to work properly.

Review Comment:
   Not clear on the usage of `isRemoteSegmentWithinLeaderEpochs` validation in 
the segment deletion path. Commented out the code for now, will go through the 
code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor

2023-09-04 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761763#comment-17761763
 ] 

Satish Duggana commented on KAFKA-12473:


Removing the KIP from 3.6.0 release plan as it does not seem to be completed.

> Make the "cooperative-sticky, range" as the default assignor
> 
>
> Key: KAFKA-12473
> URL: https://issues.apache.org/jira/browse/KAFKA-12473
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Critical
>  Labels: kip
>
> Now that 3.0 is coming up, we can change the default 
> ConsumerPartitionAssignor to something better than the RangeAssignor. The 
> original plan was to switch over to the StickyAssignor, but now that we have 
> incremental cooperative rebalancing we should  consider using the new 
> CooperativeStickyAssignor instead: this will enable the consumer group to 
> follow the COOPERATIVE protocol, improving the rebalancing experience OOTB.
> KIP: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on a diff in pull request #14330: KAFKA-15410: Delete records with tiered storage integration test (4/4)

2023-09-04 Thread via GitHub


kamalcph commented on code in PR #14330:
URL: https://github.com/apache/kafka/pull/14330#discussion_r1314695377


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1066,50 +1066,53 @@ public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
 Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
+
+// FIXME: We have to remove the below check too for `DELETE_RECORDS` 
API to work properly.
 if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
-LOGGER.debug("[{}] Remote segment {} is not within the partition 
leader epoch lineage. Remote segment epochs: {} and partition leader epochs: 
{}",
-segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs);
+LOGGER.debug("Segment {} is not within the partition leader epoch 
lineage. " +
+"Remote segment epochs: {} and partition leader 
epochs: {}",
+segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, 
leaderEpochs);
 return false;
 }
 
 for (Map.Entry entry : segmentLeaderEpochs.entrySet()) {
 int epoch = entry.getKey();
-long offset = entry.getValue();
+//long offset = entry.getValue();
 
 // If segment's epoch does not exist in the leader epoch lineage 
then it is not a valid segment.
 if (!leaderEpochs.containsKey(epoch)) {
-LOGGER.debug("[{}]  Remote segment {}'s epoch {} is not within 
the leader epoch lineage. Remote segment epochs: {} and partition leader 
epochs: {}",
-segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs);
+LOGGER.debug("Segment {} epoch {} is not within the leader 
epoch lineage. " +
+"Remote segment epochs: {} and partition 
leader epochs: {}",
+segmentMetadata.remoteLogSegmentId(), epoch, 
segmentLeaderEpochs, leaderEpochs);
 return false;
 }
 
 // Segment's first epoch's offset should be more than or equal to 
the respective leader epoch's offset.
-if (epoch == segmentFirstEpoch && offset < 
leaderEpochs.get(epoch)) {
-LOGGER.debug("[{}]  Remote segment {}'s first epoch {}'s 
offset is less than leader epoch's offset {}.",
-segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch));
-return false;
-}
+//if (epoch == segmentFirstEpoch && offset < 
leaderEpochs.get(epoch)) {
+//LOGGER.debug("Segment {} first epoch {} offset is less than 
leader epoch offset {}.",
+//segmentMetadata.remoteLogSegmentId(), epoch, 
leaderEpochs.get(epoch));
+//return false;
+//}
 
 // Segment's end offset should be less than or equal to the 
respective leader epoch's offset.
 if (epoch == segmentLastEpoch) {
 Map.Entry nextEntry = 
leaderEpochs.higherEntry(epoch);
 if (nextEntry != null && segmentEndOffset > 
nextEntry.getValue() - 1) {
-LOGGER.debug("[{}]  Remote segment {}'s end offset {} is 
more than leader epoch's offset {}.",
-segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), segmentEndOffset, nextEntry.getValue() - 
1);
+LOGGER.debug("Segment {} end offset {} is more than leader 
epoch offset {}.",
+segmentMetadata.remoteLogSegmentId(), 
segmentEndOffset, nextEntry.getValue() - 1);
 return false;
 }
 }
 
 // Next segment epoch entry and next leader epoch entry should be 
same to ensure that the segment's epoch
 // is within the leader epoch lineage.
 if (epoch != segmentLastEpoch && 
!leaderEpochs.higherEntry(epoch).equals(segmentLeaderEpochs.higherEntry(epoch)))
 {
-LOGGER.debug("[{}]  Remote segment {}'s epoch {} is not within 
the leader epoch lineage. Remote segment epochs: {} and partition leader 
epochs: {}",
-segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs);
+LOGGER.debug("Segment {} epoch {} is not within the leader 
epoch lineage. " +

Review Comment:
   The logger context already hold the partition information so removed the 
`segmentMetadata.topicIdPartition()` placeholder from the log statement.



-- 
This is an automated 

[GitHub] [kafka] kamalcph commented on a diff in pull request #14330: KAFKA-15410: Delete records with tiered storage integration test (4/4)

2023-09-04 Thread via GitHub


kamalcph commented on code in PR #14330:
URL: https://github.com/apache/kafka/pull/14330#discussion_r1314695377


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1066,50 +1066,53 @@ public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
 Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
+
+// FIXME: We have to remove the below check too for `DELETE_RECORDS` 
API to work properly.
 if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
-LOGGER.debug("[{}] Remote segment {} is not within the partition 
leader epoch lineage. Remote segment epochs: {} and partition leader epochs: 
{}",
-segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs);
+LOGGER.debug("Segment {} is not within the partition leader epoch 
lineage. " +
+"Remote segment epochs: {} and partition leader 
epochs: {}",
+segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, 
leaderEpochs);
 return false;
 }
 
 for (Map.Entry entry : segmentLeaderEpochs.entrySet()) {
 int epoch = entry.getKey();
-long offset = entry.getValue();
+//long offset = entry.getValue();
 
 // If segment's epoch does not exist in the leader epoch lineage 
then it is not a valid segment.
 if (!leaderEpochs.containsKey(epoch)) {
-LOGGER.debug("[{}]  Remote segment {}'s epoch {} is not within 
the leader epoch lineage. Remote segment epochs: {} and partition leader 
epochs: {}",
-segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs);
+LOGGER.debug("Segment {} epoch {} is not within the leader 
epoch lineage. " +
+"Remote segment epochs: {} and partition 
leader epochs: {}",
+segmentMetadata.remoteLogSegmentId(), epoch, 
segmentLeaderEpochs, leaderEpochs);
 return false;
 }
 
 // Segment's first epoch's offset should be more than or equal to 
the respective leader epoch's offset.
-if (epoch == segmentFirstEpoch && offset < 
leaderEpochs.get(epoch)) {
-LOGGER.debug("[{}]  Remote segment {}'s first epoch {}'s 
offset is less than leader epoch's offset {}.",
-segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch));
-return false;
-}
+//if (epoch == segmentFirstEpoch && offset < 
leaderEpochs.get(epoch)) {
+//LOGGER.debug("Segment {} first epoch {} offset is less than 
leader epoch offset {}.",
+//segmentMetadata.remoteLogSegmentId(), epoch, 
leaderEpochs.get(epoch));
+//return false;
+//}
 
 // Segment's end offset should be less than or equal to the 
respective leader epoch's offset.
 if (epoch == segmentLastEpoch) {
 Map.Entry nextEntry = 
leaderEpochs.higherEntry(epoch);
 if (nextEntry != null && segmentEndOffset > 
nextEntry.getValue() - 1) {
-LOGGER.debug("[{}]  Remote segment {}'s end offset {} is 
more than leader epoch's offset {}.",
-segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), segmentEndOffset, nextEntry.getValue() - 
1);
+LOGGER.debug("Segment {} end offset {} is more than leader 
epoch offset {}.",
+segmentMetadata.remoteLogSegmentId(), 
segmentEndOffset, nextEntry.getValue() - 1);
 return false;
 }
 }
 
 // Next segment epoch entry and next leader epoch entry should be 
same to ensure that the segment's epoch
 // is within the leader epoch lineage.
 if (epoch != segmentLastEpoch && 
!leaderEpochs.higherEntry(epoch).equals(segmentLeaderEpochs.higherEntry(epoch)))
 {
-LOGGER.debug("[{}]  Remote segment {}'s epoch {} is not within 
the leader epoch lineage. Remote segment epochs: {} and partition leader 
epochs: {}",
-segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs);
+LOGGER.debug("Segment {} epoch {} is not within the leader 
epoch lineage. " +

Review Comment:
   The logger context already hold the partition information so removed the 
`partition` placeholder from the logs.



-- 
This is an automated message from the Apache Git 

[GitHub] [kafka] dajac commented on pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest

2023-09-04 Thread via GitHub


dajac commented on PR #14321:
URL: https://github.com/apache/kafka/pull/14321#issuecomment-1704917384

   @rreddy-22 @CalvinConfluent @jolshan Thanks for your comments. I have 
addressed all of them. I have also changed a bit the schema and fixed a few 
other tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest

2023-09-04 Thread via GitHub


dajac commented on code in PR #14321:
URL: https://github.com/apache/kafka/pull/14321#discussion_r1314671160


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -631,4 +631,33 @@ public void testValidateOffsetCommit() {
 // This should succeed.
 group.validateOffsetCommit("member-id", "", 0);
 }
+
+@Test
+public void testValidateOffsetFetch() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
+
+// Simulate a call from the admin client without member id and member 
epoch.
+// This should pass only if the group is empty.
+group.validateOffsetFetch("", -1, Long.MAX_VALUE);
+
+// The member does not exist.
+assertThrows(UnknownMemberIdException.class, () ->
+group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE));
+
+// Create a member.
+snapshotRegistry.getOrCreateSnapshot(0);
+group.getOrMaybeCreateMember("member-id", true);
+
+// The member does not exist at epoch 0.

Review Comment:
   This comment is confusing. The epoch was referring to the epoch of the 
snapshot here. I rephrased it to make it clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest

2023-09-04 Thread via GitHub


dajac commented on code in PR #14321:
URL: https://github.com/apache/kafka/pull/14321#discussion_r131483


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -631,4 +631,33 @@ public void testValidateOffsetCommit() {
 // This should succeed.
 group.validateOffsetCommit("member-id", "", 0);
 }
+
+@Test
+public void testValidateOffsetFetch() {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
+
+// Simulate a call from the admin client without member id and member 
epoch.
+// This should pass only if the group is empty.

Review Comment:
   That part of the comment was wrong. I just removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest

2023-09-04 Thread via GitHub


dajac commented on code in PR #14321:
URL: https://github.com/apache/kafka/pull/14321#discussion_r1314665557


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -181,10 +181,28 @@ public 
List fetchOffsets(
 String groupId,
 List topics,
 long committedOffset
+) {

Review Comment:
   This or to simulate the admin client case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest

2023-09-04 Thread via GitHub


dajac commented on code in PR #14321:
URL: https://github.com/apache/kafka/pull/14321#discussion_r1314664510


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -181,10 +181,28 @@ public 
List fetchOffsets(
 String groupId,
 List topics,
 long committedOffset
+) {
+return fetchOffsets(
+groupId,
+"",
+-1,
+topics,
+committedOffset
+);
+}
+
+public List 
fetchOffsets(
+String groupId,
+String memberId,
+int memberEpoch,
+List topics,
+long committedOffset
 ) {
 OffsetFetchResponseData.OffsetFetchResponseGroup response = 
offsetMetadataManager.fetchOffsets(
 new OffsetFetchRequestData.OffsetFetchRequestGroup()
 .setGroupId(groupId)
+.setMemberId(memberId)

Review Comment:
   Correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest

2023-09-04 Thread via GitHub


dajac commented on code in PR #14321:
URL: https://github.com/apache/kafka/pull/14321#discussion_r1314663349


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -823,9 +823,17 @@ public void validateOffsetCommit(
 
 /**
  * Validates the OffsetFetch request.
+ *
+ * @param memberId  The member id. This is not provided for 
generic groups.
+ * @param memberEpoch   The member epoch for consumer groups. This 
is not provided for generic groups.
+ * @param lastCommittedOffset   The last committed offsets in the timeline.
  */
 @Override
-public void validateOffsetFetch() throws GroupIdNotFoundException {
+public void validateOffsetFetch(
+String memberId,

Review Comment:
   Correct. I mentioned this in the javadoc of the method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest

2023-09-04 Thread via GitHub


dajac commented on code in PR #14321:
URL: https://github.com/apache/kafka/pull/14321#discussion_r1314661887


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1409,6 +1444,82 @@ public void 
testFetchAllOffsetsAtDifferentCommittedOffset() {
 ), context.fetchAllOffsets("group", Long.MAX_VALUE));
 }
 
+@Test
+public void testConsumerGroupOffsetFetchWithMemberIdAndEpoch() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+// Create consumer group.
+ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
+// Create member.
+group.getOrMaybeCreateMember("member", true);
+// Commit offset.
+context.commitOffset("group", "foo", 0, 100L, 1);
+
+// Fetch offsets case.
+List topics = 
Collections.singletonList(
+new OffsetFetchRequestData.OffsetFetchRequestTopics()
+.setName("foo")
+.setPartitionIndexes(Collections.singletonList(0))
+);
+
+assertEquals(Collections.singletonList(
+new OffsetFetchResponseData.OffsetFetchResponseTopics()
+.setName("foo")
+.setPartitions(Collections.singletonList(
+mkOffsetPartitionResponse(0, 100L, 1, "metadata")
+))
+), context.fetchOffsets("group", "member", 0, topics, Long.MAX_VALUE));
+
+// Fetch all offsets case.
+assertEquals(Collections.singletonList(
+new OffsetFetchResponseData.OffsetFetchResponseTopics()
+.setName("foo")
+.setPartitions(Collections.singletonList(
+mkOffsetPartitionResponse(0, 100L, 1, "metadata")
+))
+), context.fetchAllOffsets("group", "member", 0, Long.MAX_VALUE));
+}
+
+@Test
+public void testConsumerGroupOffsetFetchWithUnknownMemberId() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+// Fetch offsets case.
+List topics = 
Collections.singletonList(
+new OffsetFetchRequestData.OffsetFetchRequestTopics()
+.setName("foo")
+.setPartitionIndexes(Collections.singletonList(0))
+);
+
+assertThrows(UnknownMemberIdException.class,
+() -> context.fetchOffsets("group", "member", 0, topics, 
Long.MAX_VALUE));
+
+// Fetch all offsets case.
+assertThrows(UnknownMemberIdException.class,
+() -> context.fetchAllOffsets("group", "member", 0, 
Long.MAX_VALUE));
+}
+
+@Test
+public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() {

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig

2023-09-04 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-6527:
--
Fix Version/s: 3.7.0
   (was: 3.6.0)

> Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
> 
>
> Key: KAFKA-6527
> URL: https://issues.apache.org/jira/browse/KAFKA-6527
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
>  Labels: flakey
> Fix For: 3.7.0
>
>
> {code:java}
> java.lang.AssertionError: Log segment size increase not applied
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:355)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig

2023-09-04 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761748#comment-17761748
 ] 

Satish Duggana commented on KAFKA-6527:
---

Moved to 3.7.0 as this JIRA is not yet assigned and we are near code freeze for 
3.6.0. 

> Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
> 
>
> Key: KAFKA-6527
> URL: https://issues.apache.org/jira/browse/KAFKA-6527
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
>  Labels: flakey
> Fix For: 3.7.0
>
>
> {code:java}
> java.lang.AssertionError: Log segment size increase not applied
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:355)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #13908: KAFKA-15052 Fix the flaky testBalancePartitionLeaders - part II

2023-09-04 Thread via GitHub


showuon commented on PR #13908:
URL: https://github.com/apache/kafka/pull/13908#issuecomment-1704888456

   Backported into 3.6 branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15052) Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()

2023-09-04 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-15052.
---
Resolution: Fixed

> Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()
> -
>
> Key: KAFKA-15052
> URL: https://issues.apache.org/jira/browse/KAFKA-15052
> Project: Kafka
>  Issue Type: Test
>Reporter: Dimitar Dimitrov
>Assignee: Dimitar Dimitrov
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.6.0
>
>
> Test failed at 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/]
>  as well as in various local runs.
> The test creates a topic, fences a broker, notes partition imbalance due to 
> another broker taking over the partition the fenced broker lost, re-registers 
> and unfences the fenced broker, sends {{AlterPartition}} for the lost 
> partition adding the now unfenced broker back to its ISR, then waits for the 
> partition imbalance to disappear.
> The local failures seem to happen when the brokers (including the ones that 
> never get fenced by the test) accidentally get fenced by losing their session 
> due to reaching the (aggressively low for test purposes) session timeout.
> The Cloudbees failure quoted above also seems to indicate that this happened:
> {code:java}
> ...[truncated 738209 chars]...
> 23. (org.apache.kafka.controller.QuorumController:768)
> [2023-06-02 18:17:22,202] DEBUG [QuorumController id=0] Scheduling write 
> event for maybeBalancePartitionLeaders because scheduled (DEFERRED), 
> checkIntervalNs (OptionalLong[10]) and isImbalanced (true) 
> (org.apache.kafka.controller.QuorumController:1401)
> [2023-06-02 18:17:22,202] INFO [QuorumController id=0] Fencing broker 2 
> because its session has timed out. 
> (org.apache.kafka.controller.ReplicationControlManager:1459)
> [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] handleBrokerFenced: 
> changing partition(s): foo-0, foo-1, foo-2 
> (org.apache.kafka.controller.ReplicationControlManager:1750)
> [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] partition change for 
> foo-0 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 
> -> 3, partitionEpoch: 2 -> 3 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
> foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: isr: [2, 3] -> [3], leaderEpoch: 
> 3 -> 4, partitionEpoch: 4 -> 5 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
> foo-2 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 
> -> 3, partitionEpoch: 2 -> 3 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,205] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, 
> appendTimestamp=240, 
> records=[ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=1, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=[3], leader=3, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=2, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(BrokerRegistrationChangeRecord(brokerId=2, 
> brokerEpoch=3, fenced=1, inControlledShutdown=0) at version 0)]), 
> prevOffset=27) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:253)
> [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Creating in-memory 
> snapshot 27 (org.apache.kafka.timeline.SnapshotRegistry:197)
> [2023-06-02 18:17:22,205] DEBUG [LocalLogManager 0] Node 0: running log 
> check. (org.apache.kafka.metalog.LocalLogManager:512)
> [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Read-write operation 
> maybeFenceReplicas(451616131) will be completed when the log reaches offset 
> 27. (org.apache.kafka.controller.QuorumController:768)
> [2023-06-02 18:17:22,208] INFO [QuorumController id=0] Fencing broker 3 
> because its session has timed out. 
> (org.apache.kafka.controller.ReplicationControlManager:1459)
> [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] handleBrokerFenced: 
> changing partition(s): foo-1 
> (org.apache.kafka.controller.ReplicationControlManager:1750)
> [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] partition change for 
> foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 3 -> -1, leaderEpoch: 4 
> -> 

[jira] [Updated] (KAFKA-15052) Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()

2023-09-04 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15052:
--
Fix Version/s: 3.6.0
   (was: 3.7.0)

> Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()
> -
>
> Key: KAFKA-15052
> URL: https://issues.apache.org/jira/browse/KAFKA-15052
> Project: Kafka
>  Issue Type: Test
>Reporter: Dimitar Dimitrov
>Assignee: Dimitar Dimitrov
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.6.0
>
>
> Test failed at 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/]
>  as well as in various local runs.
> The test creates a topic, fences a broker, notes partition imbalance due to 
> another broker taking over the partition the fenced broker lost, re-registers 
> and unfences the fenced broker, sends {{AlterPartition}} for the lost 
> partition adding the now unfenced broker back to its ISR, then waits for the 
> partition imbalance to disappear.
> The local failures seem to happen when the brokers (including the ones that 
> never get fenced by the test) accidentally get fenced by losing their session 
> due to reaching the (aggressively low for test purposes) session timeout.
> The Cloudbees failure quoted above also seems to indicate that this happened:
> {code:java}
> ...[truncated 738209 chars]...
> 23. (org.apache.kafka.controller.QuorumController:768)
> [2023-06-02 18:17:22,202] DEBUG [QuorumController id=0] Scheduling write 
> event for maybeBalancePartitionLeaders because scheduled (DEFERRED), 
> checkIntervalNs (OptionalLong[10]) and isImbalanced (true) 
> (org.apache.kafka.controller.QuorumController:1401)
> [2023-06-02 18:17:22,202] INFO [QuorumController id=0] Fencing broker 2 
> because its session has timed out. 
> (org.apache.kafka.controller.ReplicationControlManager:1459)
> [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] handleBrokerFenced: 
> changing partition(s): foo-0, foo-1, foo-2 
> (org.apache.kafka.controller.ReplicationControlManager:1750)
> [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] partition change for 
> foo-0 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 
> -> 3, partitionEpoch: 2 -> 3 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
> foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: isr: [2, 3] -> [3], leaderEpoch: 
> 3 -> 4, partitionEpoch: 4 -> 5 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for 
> foo-2 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 
> -> 3, partitionEpoch: 2 -> 3 
> (org.apache.kafka.controller.ReplicationControlManager:157)
> [2023-06-02 18:17:22,205] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, 
> appendTimestamp=240, 
> records=[ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=1, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=[3], leader=3, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=2, 
> topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, 
> removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at 
> version 0), ApiMessageAndVersion(BrokerRegistrationChangeRecord(brokerId=2, 
> brokerEpoch=3, fenced=1, inControlledShutdown=0) at version 0)]), 
> prevOffset=27) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:253)
> [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Creating in-memory 
> snapshot 27 (org.apache.kafka.timeline.SnapshotRegistry:197)
> [2023-06-02 18:17:22,205] DEBUG [LocalLogManager 0] Node 0: running log 
> check. (org.apache.kafka.metalog.LocalLogManager:512)
> [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Read-write operation 
> maybeFenceReplicas(451616131) will be completed when the log reaches offset 
> 27. (org.apache.kafka.controller.QuorumController:768)
> [2023-06-02 18:17:22,208] INFO [QuorumController id=0] Fencing broker 3 
> because its session has timed out. 
> (org.apache.kafka.controller.ReplicationControlManager:1459)
> [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] handleBrokerFenced: 
> changing partition(s): foo-1 
> (org.apache.kafka.controller.ReplicationControlManager:1750)
> [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] partition change for 
> foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: 

[GitHub] [kafka] showuon merged pull request #13908: KAFKA-15052 Fix the flaky testBalancePartitionLeaders - part II

2023-09-04 Thread via GitHub


showuon merged PR #13908:
URL: https://github.com/apache/kafka/pull/13908


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)

2023-09-04 Thread via GitHub


showuon commented on code in PR #14307:
URL: https://github.com/apache/kafka/pull/14307#discussion_r1314623009


##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##
@@ -56,7 +56,7 @@ public Integer getBrokerId() {
 
 /**
  * Wait until the first segment offset in Apache Kafka storage for the 
given topic-partition is
- * equal or greater to the provided offset.
+ * equal to the provided offset.

Review Comment:
   Thanks for the fix.



##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionTimeTest.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+
+import java.util.Collections;
+import java.util.Map;
+
+public final class DeleteSegmentsByRetentionTimeTest extends 
DeleteSegmentsByRetentionSizeTest {

Review Comment:
   It's surprising the retentionTimeTest extends retentionSizeTest. Could we 
create an abstract `BaseDeleteSegmentsTest` class, and put the test spec there. 
Then we can have retentionTimeTest and retentionSizeTest tests extends the 
`BaseDeleteSegmentsTest`. 



##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -463,17 +463,21 @@ class BrokerServer(
   new KafkaConfig(config.originals(), true)
 
   // Start RemoteLogManager before broker start serving the requests.
-  remoteLogManagerOpt.foreach(rlm => {
+  remoteLogManagerOpt.foreach { rlm =>
 val listenerName = 
config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()
 if (listenerName != null) {
-  val endpoint = endpoints.stream.filter(e => 
e.listenerName.equals(ListenerName.normalised(listenerName)))
+  val endpoint = endpoints.stream
+.filter(e =>
+  e.listenerName().isPresent &&
+  
ListenerName.normalised(e.listenerName().get()).equals(ListenerName.normalised(listenerName))
+)

Review Comment:
   Nice catch! 



##
storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectEmptyRemoteStorageAction.java:
##
@@ -36,8 +36,8 @@ public ExpectEmptyRemoteStorageAction(TopicPartition 
topicPartition) {
 public void doExecute(TieredStorageTestContext context) throws 
InterruptedException {
 TestUtils.waitForCondition(() -> {
 LocalTieredStorageSnapshot snapshot = 
context.takeTieredStorageSnapshot();
-return !snapshot.getTopicPartitions().contains(topicPartition) &&
-snapshot.getFilesets(topicPartition).isEmpty();
+// With KAFKA-15166, snapshot should not contain the topicPartition

Review Comment:
   OK, so I think we should add all the explanation into the comment. From the 
current comment, it's not clear to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 opened a new pull request, #14331: Add documentation for tiered storage metrics

2023-09-04 Thread via GitHub


abhijeetk88 opened a new pull request, #14331:
URL: https://github.com/apache/kafka/pull/14331

   Added documentation for tiered storage metrics.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15430) Kafla create replca partition on controller node

2023-09-04 Thread Andrii Vysotskiy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrii Vysotskiy updated KAFKA-15430:
-
Description: 
I have configuration 5 nodes (KRAFT mode), with next roles: 4 broker+controller 
and 1 controller. Create topic with replication factor 5, and it is created, 
and describe show that topic partition have 5 replicas.

 

{{/opt/kafka/latest/bin/kafka-topics.sh --create 
--bootstrap-server=dc1-prod-kafka-001-vs:9092 --replication-factor 5 
--partitions 1 --topic test5}}

 

/opt/kafka/latest/bin/kafka-topics.sh --describe --topic test5 
--bootstrap-server=dc1-prod-kafka-001-vs:9092
Topic: test5 TopicId: amuqr8EgRmqeKryUHZwsMA PartitionCount: 1 
ReplicationFactor: 5 Configs: segment.bytes=1073741824
Topic: test5 Partition: 0 Leader: 3 Replicas: 3,4,1,2,5 Isr: 3,4,1,2}}

 

Replicas 5 and ISR 4. Why does kafka initially allow you to create a replica on 
the controller node, although in reality the replica is not created on the 
controller node and there are no topic files in the log directory.

Is this expected behavior or not? Thanks.

I want to understand whether such behavior is the norm for Kafka

 

  was:
{*}{*}I have configuration 5 nodes, with next roles: 4 broker+controller and 1 
controller. Create topic with replication factor 5, and it is created, and 
describe show that topic partition have 5 replicas.

 

{{/opt/kafka/latest/bin/kafka-topics.sh --create 
--bootstrap-server=dc1-prod-kafka-001-vs:9092 --replication-factor 5 
--partitions 1 --topic test5}}

 

{{/opt/kafka/latest/bin/kafka-topics.sh --describe --topic test5 
--bootstrap-server=dc1-prod-kafka-001-vs:9092
Topic: test5 TopicId: amuqr8EgRmqeKryUHZwsMA PartitionCount: 1 
ReplicationFactor: 5 Configs: segment.bytes=1073741824
Topic: test5 Partition: 0 Leader: 3 Replicas: 3,4,1,2,5 Isr: 3,4,1,2}}

{{}}

Replicas 5 and ISR 4. Why does kafka initially allow you to create a replica on 
the controller node, although in reality the replica is not created on the 
controller node and there are no topic files in the log directory.

Is this expected behavior or not? Thanks.

I want to understand whether such behavior is the norm for Kafka

{{}}


> Kafla create replca partition on controller node
> 
>
> Key: KAFKA-15430
> URL: https://issues.apache.org/jira/browse/KAFKA-15430
> Project: Kafka
>  Issue Type: Test
>  Components: kraft
>Affects Versions: 3.5.1
>Reporter: Andrii Vysotskiy
>Priority: Minor
>
> I have configuration 5 nodes (KRAFT mode), with next roles: 4 
> broker+controller and 1 controller. Create topic with replication factor 5, 
> and it is created, and describe show that topic partition have 5 replicas.
>  
> {{/opt/kafka/latest/bin/kafka-topics.sh --create 
> --bootstrap-server=dc1-prod-kafka-001-vs:9092 --replication-factor 5 
> --partitions 1 --topic test5}}
>  
> /opt/kafka/latest/bin/kafka-topics.sh --describe --topic test5 
> --bootstrap-server=dc1-prod-kafka-001-vs:9092
> Topic: test5 TopicId: amuqr8EgRmqeKryUHZwsMA PartitionCount: 1 
> ReplicationFactor: 5 Configs: segment.bytes=1073741824
> Topic: test5 Partition: 0 Leader: 3 Replicas: 3,4,1,2,5 Isr: 3,4,1,2}}
>  
> Replicas 5 and ISR 4. Why does kafka initially allow you to create a replica 
> on the controller node, although in reality the replica is not created on the 
> controller node and there are no topic files in the log directory.
> Is this expected behavior or not? Thanks.
> I want to understand whether such behavior is the norm for Kafka
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15430) Kafla create replca partition on controller node

2023-09-04 Thread Andrii Vysotskiy (Jira)
Andrii Vysotskiy created KAFKA-15430:


 Summary: Kafla create replca partition on controller node
 Key: KAFKA-15430
 URL: https://issues.apache.org/jira/browse/KAFKA-15430
 Project: Kafka
  Issue Type: Test
  Components: kraft
Affects Versions: 3.5.1
Reporter: Andrii Vysotskiy


{*}{*}I have configuration 5 nodes, with next roles: 4 broker+controller and 1 
controller. Create topic with replication factor 5, and it is created, and 
describe show that topic partition have 5 replicas.

 

{{/opt/kafka/latest/bin/kafka-topics.sh --create 
--bootstrap-server=dc1-prod-kafka-001-vs:9092 --replication-factor 5 
--partitions 1 --topic test5}}

 

{{/opt/kafka/latest/bin/kafka-topics.sh --describe --topic test5 
--bootstrap-server=dc1-prod-kafka-001-vs:9092
Topic: test5 TopicId: amuqr8EgRmqeKryUHZwsMA PartitionCount: 1 
ReplicationFactor: 5 Configs: segment.bytes=1073741824
Topic: test5 Partition: 0 Leader: 3 Replicas: 3,4,1,2,5 Isr: 3,4,1,2}}

{{}}

Replicas 5 and ISR 4. Why does kafka initially allow you to create a replica on 
the controller node, although in reality the replica is not created on the 
controller node and there are no topic files in the log directory.

Is this expected behavior or not? Thanks.

I want to understand whether such behavior is the norm for Kafka

{{}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] omkreddy merged pull request #14318: KAFKA-15422: Update documentation for Delegation Tokens in Kafka with KRaft

2023-09-04 Thread via GitHub


omkreddy merged PR #14318:
URL: https://github.com/apache/kafka/pull/14318


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] omkreddy commented on pull request #14318: KAFKA-15422: Update documentation for Delegation Tokens in Kafka with KRaft

2023-09-04 Thread via GitHub


omkreddy commented on PR #14318:
URL: https://github.com/apache/kafka/pull/14318#issuecomment-1704861029

   Test failures are not related, merging this minor doc change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] elkkhan commented on a diff in pull request #14077: KAFKA-14112: Expose replication-offset-lag Mirror metric

2023-09-04 Thread via GitHub


elkkhan commented on code in PR #14077:
URL: https://github.com/apache/kafka/pull/14077#discussion_r1314605208


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceMetrics.java:
##
@@ -104,12 +108,25 @@ class MirrorSourceMetrics implements AutoCloseable {
 replicationLatencyAvg = new MetricNameTemplate(
 "replication-latency-ms-avg", SOURCE_CONNECTOR_GROUP,
 "Average time it takes records to replicate from source to 
target cluster.", partitionTags);
+replicationOffsetLag = new MetricNameTemplate(

Review Comment:
   @hudeqi I hear you now - we do have a disagreement about the lag definition 
and it was an oversight on my end. I will copy your reply over to the mailing 
thread and will answer there to keep the discussion in one place



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #14077: KAFKA-14112: Expose replication-offset-lag Mirror metric

2023-09-04 Thread via GitHub


hudeqi commented on code in PR #14077:
URL: https://github.com/apache/kafka/pull/14077#discussion_r1314592488


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceMetrics.java:
##
@@ -104,12 +108,25 @@ class MirrorSourceMetrics implements AutoCloseable {
 replicationLatencyAvg = new MetricNameTemplate(
 "replication-latency-ms-avg", SOURCE_CONNECTOR_GROUP,
 "Average time it takes records to replicate from source to 
target cluster.", partitionTags);
+replicationOffsetLag = new MetricNameTemplate(

Review Comment:
   I guess we have a disagreement about lag? My understanding of lag is: the 
real LEO of the source cluster partition minus the LEO that has been written to 
the target cluster. It seems that your definition of lag is: the lag between 
the mirror task getting data from consumption and writing it to the target 
cluster?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest

2023-09-04 Thread via GitHub


dajac commented on code in PR #14321:
URL: https://github.com/apache/kafka/pull/14321#discussion_r1314590003


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -538,17 +537,46 @@ public void validateOffsetCommit(
 if (memberEpoch < 0 && members().isEmpty()) return;
 
 final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, 
false);
-if (memberEpoch != member.memberEpoch()) {
-throw Errors.STALE_MEMBER_EPOCH.exception();
-}
+validateMemberEpoch(memberEpoch, member.memberEpoch());
 }
 
 /**
  * Validates the OffsetFetch request.
+ *
+ * @param memberId  The member id for consumer groups.
+ * @param memberEpoch   The member epoch for consumer groups.
+ * @param lastCommittedOffset   The last committed offsets in the timeline.
  */
 @Override
-public void validateOffsetFetch() {
-// Nothing.
+public void validateOffsetFetch(
+String memberId,
+int memberEpoch,
+long lastCommittedOffset
+) throws UnknownMemberIdException, StaleMemberEpochException {
+// When the member epoch is -1, the request comes from the admin 
client. In this case,
+// the request can commit offsets if the group is empty.

Review Comment:
   The comment is wrong. Let me fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #14077: KAFKA-14112: Expose replication-offset-lag Mirror metric

2023-09-04 Thread via GitHub


hudeqi commented on code in PR #14077:
URL: https://github.com/apache/kafka/pull/14077#discussion_r1314583243


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -282,6 +285,38 @@ SourceRecord convertRecord(ConsumerRecord 
record) {
 record.timestamp(), headers);
 }
 
+//visible for testing
+void reportReplicationOffsetLag(ConsumerRecords 
lastPolledRecords) {
+Set partitions = lastPolledRecords.partitions();
+partitions.forEach(p -> {
+try {
+long replicationOffsetLag = 
getReplicationOffsetLagForPartition(p, lastPolledRecords.records(p));
+if (replicationOffsetLag < 0) {
+log.warn("Replication offset lag for partition {} is 
negative({}) - " +
+"skipping metric reporting for this partition.", 
p, replicationOffsetLag);
+return;
+}
+metrics.replicationOffsetLag(p, replicationOffsetLag + 1); 
//+1 to account for zero-based offset numbering
+} catch (UnsupportedOperationException e) {
+log.error("Failed to calculate replication offset lag for 
partition {}.", p, e);
+}
+});
+}
+
+private long getReplicationOffsetLagForPartition(TopicPartition partition,
+ 
List> lastPolledRecordsForPartition) {
+ConsumerRecord lastPolledRecord =
+
lastPolledRecordsForPartition.get(lastPolledRecordsForPartition.size() - 1);
+if (!lastPolledRecord.topic().equals(partition.topic()) || 
lastPolledRecord.partition() != partition.partition()) {
+String error = String.format(
+"Unexpected topic/partition mismatch while calculating 
replication-offset-lag. Expected: %s, got: %s-%s.",
+partition, lastPolledRecord.topic(), 
lastPolledRecord.partition());
+throw new UnsupportedOperationException(error);
+}
+long endOffsetForPartition = lastPolledRecord.offset();

Review Comment:
   My doubts are here: I think the LEO of the partition should be the log end 
offset of the partition in the source cluster, but the 
`lastPolledRecord.offset()` here represents only the offset in the source 
cluster of the last record polled by the task on the partition, that is to say, 
maybe the log end offset of the source cluster has reached 100, but due to the 
poor consumer performance of the task, it is actually only polled to the 
position where the offset is equal to 80, so the lag must be greater than 20 
(the reason why it is greater than this is because it has just been polled 
data, not yet written to the target cluster, I think we agree on this). But if 
you follow the logic here in the PR, LEO will be 80, but in fact the offset of 
the source cluster has been written to 100.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)

2023-09-04 Thread via GitHub


clolov commented on PR #14307:
URL: https://github.com/apache/kafka/pull/14307#issuecomment-1704805274

   I will aim to provide a review by the end of the day! Thanks for the effort


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15399) Enable OffloadAndConsumeFromLeader test

2023-09-04 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761727#comment-17761727
 ] 

Christo Lolov commented on KAFKA-15399:
---

Heya [~pnee], [~lianetm] and [~showuon]!

I had a look at the last 5 builds on trunk since 
[https://github.com/apache/kafka/pull/14319] was merged in and none of them 
contain the test failure detailed in 
https://issues.apache.org/jira/browse/KAFKA-15427:
 * 
[https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2164/#showFailuresLink]
 * [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2165/]
 * 
[https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2166/#showFailuresLink]
 * [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2167/]
 * [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2168/]

I believe the issue is resolved so I will keep this Jira ticket as resolved as 
well, but if you manage to find an occurrence I will gladly reopen it!

> Enable OffloadAndConsumeFromLeader test
> ---
>
> Key: KAFKA-15399
> URL: https://issues.apache.org/jira/browse/KAFKA-15399
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.6.0
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Blocker
> Fix For: 3.6.0
>
>
> Build / JDK 17 and Scala 2.13 / initializationError – 
> org.apache.kafka.tiered.storage.integration.OffloadAndConsumeFromLeaderTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ocadaruma commented on pull request #14242: KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance

2023-09-04 Thread via GitHub


ocadaruma commented on PR #14242:
URL: https://github.com/apache/kafka/pull/14242#issuecomment-1704786562

   @showuon 
   Thank you for your review.
   
   0) Got it. I revised the PR description to include analysis.
   2) The actual change is made against `SnapshotFile.java#renameTo`, which is 
called from removeAndMarkSnapshotForDeletion.
   3)
   
   > We can always recover from logs when unclean shutdown.
   
   Yes. However, precisely, removing fsync on `LeaderEpochFileCache`'s 
truncation doesn't cause extra recovery even on unclean shutdown IMO. The 
reason:
   - Since we still fsync on `LeaderEpochFileCache#assign`, we can still ensure 
all necessary leader epochs are in leader-epoch cache file
   - Even when truncation is not flushed (so "should-be-truncated" epochs may 
be left on the epoch file on unclean shutdown), log-loading procedure should 
truncate the epoch file as necessary (based on the log start/end offset). It's 
a fairly right-weight operation comparing to the recovering from the log.
   - Hmm, I intentionally didn't create a overloaded method because I was 
afraid a bit that default (fsync: true) method is used casually in the future 
code change even for the place which fsync isn't necessary.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #14242: KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance

2023-09-04 Thread via GitHub


showuon commented on PR #14242:
URL: https://github.com/apache/kafka/pull/14242#issuecomment-1704740274

   @ocadaruma , thanks for the improvement! Some high level questions:
   0. Although you've added comments in the JIRA, it'd better you add your 
analysis and what/why you've changed in the PR description.
   1. moving fsync call to the scheduler thread in `takeSnapshot` case makes 
sense to me, since we have every info in memory cache. And the log recovery can 
recover the snapshot when unclean shutdown.
   2. For `removeAndMarkSnapshotForDeletion`, I didn't see this fix, could you 
explain it?
   3. For `LeaderEpochFileCache#truncateXXX`, I agree that as long as the 
memory cache is up-to-date, it should be fine. We can always recover from logs 
when unclean shutdown.
   4. nit: In the PR, could we make less code change for easier review? That 
is, we can create a overloaded method, and take one more parameter (boolean 
sync), and delegate the original method implementation to the new method 
(default to true). So, the only place we need to change, is the places we want 
to `false` the sync flush, which will make the PR much clear IMO.
   
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #14217: KAFKA-14595 ReassignPartitionsCommandArgsTest rewritten in java

2023-09-04 Thread via GitHub


nizhikov commented on PR #14217:
URL: https://github.com/apache/kafka/pull/14217#issuecomment-1704725921

   Hello @gharris1727 Thanks for the review. Are you ready to merge this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)

2023-09-04 Thread via GitHub


satishd commented on code in PR #14329:
URL: https://github.com/apache/kafka/pull/14329#discussion_r1314498306


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java:
##
@@ -98,35 +98,28 @@ public void waitTillConsumptionCatchesUp(RecordMetadata 
recordMetadata) throws T
  */
 public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata,
  long timeoutMs) throws 
TimeoutException {
-final int partition = recordMetadata.partition();
-final long consumeCheckIntervalMs = 
Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs);
-
-log.info("Waiting until consumer is caught up with the target 
partition: [{}]", partition);
-
+int partition = recordMetadata.partition();
 // If the current assignment does not have the subscription for this 
partition then return immediately.
 if (!consumerTask.isMetadataPartitionAssigned(partition)) {
-throw new KafkaException("This consumer is not assigned to the 
target partition " + partition + ". " +
-"Partitions currently assigned: " + 
consumerTask.metadataPartitionsAssigned());
+throw new KafkaException("This consumer is not assigned to the 
target partition " + partition +
+". Currently assigned partitions: " + 
consumerTask.metadataPartitionsAssigned());
 }
-
-final long offset = recordMetadata.offset();
+long offset = recordMetadata.offset();
 long startTimeMs = time.milliseconds();
+long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS, 
timeoutMs);
+log.info("Wait until the consumer is caught up with the target 
partition {} up-to offset {}", partition, offset);
 while (true) {
-log.debug("Checking if partition [{}] is up to date with offset 
[{}]", partition, offset);
 long readOffset = 
consumerTask.readOffsetForMetadataPartition(partition).orElse(-1L);
 if (readOffset >= offset) {
 return;
 }
-
-log.debug("Expected offset [{}] for partition [{}], but the 
committed offset: [{}],  Sleeping for [{}] to retry again",
-offset, partition, readOffset, consumeCheckIntervalMs);
-
+log.debug("Expected offset for partition {} is {}, but the read 
offset is {}. " +
+"Sleeping for {} ms to retry again", partition, offset, 
readOffset, consumeCheckIntervalMs);
 if (time.milliseconds() - startTimeMs > timeoutMs) {
-log.warn("Expected offset for partition:[{}] is : [{}], but 
the committed offset: [{}] ",
-partition, readOffset, offset);
+log.warn("Expected offset for partition {} is {}, but the read 
offset is {}",
+partition, offset, readOffset);

Review Comment:
   Good catch on the ordering of the offsets in the log. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >