[GitHub] [kafka] kamalcph commented on pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)
kamalcph commented on PR #14329: URL: https://github.com/apache/kafka/pull/14329#issuecomment-1705975965 Rebased the PR against trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan merged pull request #14324: KAFKA-15424: Make the transaction verification a dynamic configuration
jolshan merged PR #14324: URL: https://github.com/apache/kafka/pull/14324 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15309) Add custom error handler to Producer
[ https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761924#comment-17761924 ] Matthias J. Sax commented on KAFKA-15309: - Sure, the ticket is up for grabs. Note, that we will need a KIP for this to get a proper and approved design -> [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] Let us know if you have any questions about the KIP process. > Add custom error handler to Producer > > > Key: KAFKA-15309 > URL: https://issues.apache.org/jira/browse/KAFKA-15309 > Project: Kafka > Issue Type: New Feature > Components: producer >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > Attachments: KafkaProducerReproducer.java, app.log > > > The producer collects multiple records into batches, and a single record > specific error might fail the whole batch (eg, `RecordTooLargeException`). > This ticket suggests to add a per-record error handler, that allows user to > opt into skipping bad records without failing the whole batch (similar to > Kafka Streams `ProductionExceptionHandler`). > The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused > https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket. > Another example for which a production exception handler could be useful, if > a user tries to write into a non-existing topic, which returns a retryable > error code; with infinite retries the producer would hang retrying forever. A > handler could help to break the infinite retry loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()
[ https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761918#comment-17761918 ] Matthias J. Sax edited comment on KAFKA-15417 at 9/5/23 1:57 AM: - I believe it could be related to an internal "throttling" mechanism that was added to improve throughput. Can you try to set config {{\_\_emit.interval.ms.kstreams.outer.join.spurious.results.fix\_\_}} to zero to see if it resolve the issue? was (Author: mjsax): I believe it could be related to an internal "throttling" mechanism that was added to improve throughput. Can you try to set config {{__{_}emit.interval.ms.kstreams.outer.join.spurious.results.fix_{_}_}} to zero to see if it resolve the issue? > JoinWindow does not seem to work properly with a KStream - KStream - > LeftJoin() > > > Key: KAFKA-15417 > URL: https://issues.apache.org/jira/browse/KAFKA-15417 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Victor van den Hoven >Priority: Major > Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java > > > In Kafka-streams 3.4.0 : > According to the javadoc of the Joinwindow: > _There are three different window configuration supported:_ > * _before = after = time-difference_ > * _before = 0 and after = time-difference_ > * _*before = time-difference and after = 0*_ > > However if I use a joinWindow with *before = time-difference and after = 0* > on a kstream-kstream-leftjoin the *after=0* part does not seem to work. > When using _stream1.leftjoin(stream2, joinWindow)_ with > {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on > stream 1 that can not be joined with any messages on stream2 should be joined > with a null-record after the _joinWindow.after_ has ended and a new message > has arrived on stream1. > It does not. > Only if the new message arrives after the value of _joinWindow.before_ the > previous message will be joined with a null-record. > > Attached you can find two files with a TopologyTestDriver Unit test to > reproduce. > topology: stream1.leftjoin( stream2, joiner, joinwindow) > joinWindow has before=5000ms and after=0 > message1(key1) -> stream1 > after 4000ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 4900ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 5000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > after 6000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()
[ https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761918#comment-17761918 ] Matthias J. Sax edited comment on KAFKA-15417 at 9/5/23 1:56 AM: - I believe it could be related to an internal "throttling" mechanism that was added to improve throughput. Can you try to set config {{__emit.interval.ms.kstreams.outer.join.spurious.results.fix{{_}}{_}__}} to zero to see if it resolve the issue? was (Author: mjsax): I believe it could be related to an internal "throttling" mechanism that was added to improve throughput. Can you try to set config `__{_}emit.interval.ms.kstreams.outer.join.spurious.results.fix{_}__` to zero to see if it resolves the issue? > JoinWindow does not seem to work properly with a KStream - KStream - > LeftJoin() > > > Key: KAFKA-15417 > URL: https://issues.apache.org/jira/browse/KAFKA-15417 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Victor van den Hoven >Priority: Major > Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java > > > In Kafka-streams 3.4.0 : > According to the javadoc of the Joinwindow: > _There are three different window configuration supported:_ > * _before = after = time-difference_ > * _before = 0 and after = time-difference_ > * _*before = time-difference and after = 0*_ > > However if I use a joinWindow with *before = time-difference and after = 0* > on a kstream-kstream-leftjoin the *after=0* part does not seem to work. > When using _stream1.leftjoin(stream2, joinWindow)_ with > {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on > stream 1 that can not be joined with any messages on stream2 should be joined > with a null-record after the _joinWindow.after_ has ended and a new message > has arrived on stream1. > It does not. > Only if the new message arrives after the value of _joinWindow.before_ the > previous message will be joined with a null-record. > > Attached you can find two files with a TopologyTestDriver Unit test to > reproduce. > topology: stream1.leftjoin( stream2, joiner, joinwindow) > joinWindow has before=5000ms and after=0 > message1(key1) -> stream1 > after 4000ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 4900ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 5000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > after 6000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()
[ https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761918#comment-17761918 ] Matthias J. Sax edited comment on KAFKA-15417 at 9/5/23 1:56 AM: - I believe it could be related to an internal "throttling" mechanism that was added to improve throughput. Can you try to set config {{__{_}emit.interval.ms.kstreams.outer.join.spurious.results.fix_{_}_}} to zero to see if it resolve the issue? was (Author: mjsax): I believe it could be related to an internal "throttling" mechanism that was added to improve throughput. Can you try to set config {{__emit.interval.ms.kstreams.outer.join.spurious.results.fix{{_}}{_}__}} to zero to see if it resolve the issue? > JoinWindow does not seem to work properly with a KStream - KStream - > LeftJoin() > > > Key: KAFKA-15417 > URL: https://issues.apache.org/jira/browse/KAFKA-15417 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Victor van den Hoven >Priority: Major > Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java > > > In Kafka-streams 3.4.0 : > According to the javadoc of the Joinwindow: > _There are three different window configuration supported:_ > * _before = after = time-difference_ > * _before = 0 and after = time-difference_ > * _*before = time-difference and after = 0*_ > > However if I use a joinWindow with *before = time-difference and after = 0* > on a kstream-kstream-leftjoin the *after=0* part does not seem to work. > When using _stream1.leftjoin(stream2, joinWindow)_ with > {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on > stream 1 that can not be joined with any messages on stream2 should be joined > with a null-record after the _joinWindow.after_ has ended and a new message > has arrived on stream1. > It does not. > Only if the new message arrives after the value of _joinWindow.before_ the > previous message will be joined with a null-record. > > Attached you can find two files with a TopologyTestDriver Unit test to > reproduce. > topology: stream1.leftjoin( stream2, joiner, joinwindow) > joinWindow has before=5000ms and after=0 > message1(key1) -> stream1 > after 4000ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 4900ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 5000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > after 6000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()
[ https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761918#comment-17761918 ] Matthias J. Sax edited comment on KAFKA-15417 at 9/5/23 1:55 AM: - I believe it could be related to an internal "throttling" mechanism that was added to improve throughput. Can you try to set config `__{_}emit.interval.ms.kstreams.outer.join.spurious.results.fix{_}__` to zero to see if it resolves the issue? was (Author: mjsax): I believe it could be related to an internal "throttling" mechanism that was added to improve throughput. Can you try to set config `__emit.interval.ms.kstreams.outer.join.spurious.results.fix__` to zero to see if it resolves the issue? > JoinWindow does not seem to work properly with a KStream - KStream - > LeftJoin() > > > Key: KAFKA-15417 > URL: https://issues.apache.org/jira/browse/KAFKA-15417 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Victor van den Hoven >Priority: Major > Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java > > > In Kafka-streams 3.4.0 : > According to the javadoc of the Joinwindow: > _There are three different window configuration supported:_ > * _before = after = time-difference_ > * _before = 0 and after = time-difference_ > * _*before = time-difference and after = 0*_ > > However if I use a joinWindow with *before = time-difference and after = 0* > on a kstream-kstream-leftjoin the *after=0* part does not seem to work. > When using _stream1.leftjoin(stream2, joinWindow)_ with > {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on > stream 1 that can not be joined with any messages on stream2 should be joined > with a null-record after the _joinWindow.after_ has ended and a new message > has arrived on stream1. > It does not. > Only if the new message arrives after the value of _joinWindow.before_ the > previous message will be joined with a null-record. > > Attached you can find two files with a TopologyTestDriver Unit test to > reproduce. > topology: stream1.leftjoin( stream2, joiner, joinwindow) > joinWindow has before=5000ms and after=0 > message1(key1) -> stream1 > after 4000ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 4900ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 5000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > after 6000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()
[ https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761918#comment-17761918 ] Matthias J. Sax commented on KAFKA-15417: - I believe it could be related to an internal "throttling" mechanism that was added to improve throughput. Can you try to set config `__emit.interval.ms.kstreams.outer.join.spurious.results.fix__` to zero to see if it resolves the issue? > JoinWindow does not seem to work properly with a KStream - KStream - > LeftJoin() > > > Key: KAFKA-15417 > URL: https://issues.apache.org/jira/browse/KAFKA-15417 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Victor van den Hoven >Priority: Major > Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java > > > In Kafka-streams 3.4.0 : > According to the javadoc of the Joinwindow: > _There are three different window configuration supported:_ > * _before = after = time-difference_ > * _before = 0 and after = time-difference_ > * _*before = time-difference and after = 0*_ > > However if I use a joinWindow with *before = time-difference and after = 0* > on a kstream-kstream-leftjoin the *after=0* part does not seem to work. > When using _stream1.leftjoin(stream2, joinWindow)_ with > {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on > stream 1 that can not be joined with any messages on stream2 should be joined > with a null-record after the _joinWindow.after_ has ended and a new message > has arrived on stream1. > It does not. > Only if the new message arrives after the value of _joinWindow.before_ the > previous message will be joined with a null-record. > > Attached you can find two files with a TopologyTestDriver Unit test to > reproduce. > topology: stream1.leftjoin( stream2, joiner, joinwindow) > joinWindow has before=5000ms and after=0 > message1(key1) -> stream1 > after 4000ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 4900ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 5000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > after 6000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15383) Replace EasyMock with Mockito for KTableImplTest
[ https://issues.apache.org/jira/browse/KAFKA-15383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15383: Component/s: streams unit tests > Replace EasyMock with Mockito for KTableImplTest > > > Key: KAFKA-15383 > URL: https://issues.apache.org/jira/browse/KAFKA-15383 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: Fei Xie >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15385) Replace EasyMock with Mockito for AbstractStreamTest
[ https://issues.apache.org/jira/browse/KAFKA-15385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15385: Component/s: streams unit tests > Replace EasyMock with Mockito for AbstractStreamTest > > > Key: KAFKA-15385 > URL: https://issues.apache.org/jira/browse/KAFKA-15385 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: Fei Xie >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15382) Replace EasyMock with Mockito for KStreamTransformValuesTest
[ https://issues.apache.org/jira/browse/KAFKA-15382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15382: Component/s: streams unit tests > Replace EasyMock with Mockito for KStreamTransformValuesTest > > > Key: KAFKA-15382 > URL: https://issues.apache.org/jira/browse/KAFKA-15382 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: Fei Xie >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15384) Replace EasyMock with Mockito for KTableTransformValuesTest
[ https://issues.apache.org/jira/browse/KAFKA-15384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15384: Component/s: streams unit tests > Replace EasyMock with Mockito for KTableTransformValuesTest > --- > > Key: KAFKA-15384 > URL: https://issues.apache.org/jira/browse/KAFKA-15384 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: Fei Xie >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd merged pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)
satishd merged PR #14307: URL: https://github.com/apache/kafka/pull/14307 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)
satishd commented on PR #14307: URL: https://github.com/apache/kafka/pull/14307#issuecomment-1705769550 There are a couple of unrelated tests failed in Jenkins jobs, merging to trunk and 3.6. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14936) Add Grace Period To Stream Table Join
[ https://issues.apache.org/jira/browse/KAFKA-14936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson resolved KAFKA-14936. Resolution: Done > Add Grace Period To Stream Table Join > - > > Key: KAFKA-14936 > URL: https://issues.apache.org/jira/browse/KAFKA-14936 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Labels: kip, streams > Fix For: 3.6.0 > > > Include the grace period for stream table joins as described in kip 923. > Also add a rocksDB time based queueing implementation of > `TimeOrderedKeyValueBuffer` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14936) Add Grace Period To Stream Table Join
[ https://issues.apache.org/jira/browse/KAFKA-14936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson updated KAFKA-14936: --- Fix Version/s: 3.6.0 > Add Grace Period To Stream Table Join > - > > Key: KAFKA-14936 > URL: https://issues.apache.org/jira/browse/KAFKA-14936 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Labels: kip, streams > Fix For: 3.6.0 > > > Include the grace period for stream table joins as described in kip 923. > Also add a rocksDB time based queueing implementation of > `TimeOrderedKeyValueBuffer` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1315178928 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java: ## @@ -0,0 +1,1070 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class OptimizedUniformAssignmentBuilderTest { +private final UniformAssignor assignor = new UniformAssignor(); +private final Uuid topic1Uuid = Uuid.fromString("T1-A4s3VTwiI5CTbEp6POw"); +private final Uuid topic2Uuid = Uuid.fromString("T2-B4s3VTwiI5YHbPp6YUe"); +private final Uuid topic3Uuid = Uuid.fromString("T3-CU8fVTLCz5YMkLoDQsa"); +private final String topic1Name = "topic1"; +private final String topic2Name = "topic2"; +private final String topic3Name = "topic3"; +private final String memberA = "A"; +private final String memberB = "B"; +private final String memberC = "C"; + +@Test +public void testOneMemberNoTopicSubscription() { +SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata( +Collections.singletonMap( +topic1Uuid, +new TopicMetadata( +topic1Uuid, +topic1Name, +3, +mkMapOfPartitionRacks(3) +) +) +); + +Map members = Collections.singletonMap( +memberA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.emptyList(), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + +assertEquals(Collections.emptyMap(), groupAssignment.members()); +} + +@Test +public void testOneMemberSubscribedToNonexistentTopic() { +SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata( +Collections.singletonMap( +topic1Uuid, +new TopicMetadata( +topic1Uuid, +topic1Name, +3, +mkMapOfPartitionRacks(3) +) +) +); + +Map members = Collections.singletonMap( +memberA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.singletonList(topic2Uuid), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + +assertEquals(Collections.emptyMap(), groupAssignment.members()); +} + +@Test +public void testFirstAssignmentTwoMembersSubscribedToTwoTopicsNoMemberRacks() { +Map topicMetadata = new HashMap<>(); +topicMetadata.put(topic1Uuid, new TopicMetadata( +topic1Uuid, +topic1Name, +3, +mkMapOfPartitionRacks(3) +)); +
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1315178781 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java: ## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * The Uniform Assignor distributes Kafka topic partitions among group members for balanced assignment. + * The assignor employs two different strategies based on the nature of topic + * subscriptions across the group members: + * + * + * Optimized Uniform Assignment Builder: This strategy is used when all members have subscribed + * to the same set of topics. + * + * + * General Uniform Assignment Builder: This strategy is used when members have varied topic + * subscriptions. + * + * + * + * The appropriate strategy is automatically chosen based on the current members' topic subscriptions. + * + * @see OptimizedUniformAssignmentBuilder + * @see GeneralUniformAssignmentBuilder + */ +public class UniformAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(UniformAssignor.class); +public static final String UNIFORM_ASSIGNOR_NAME = "uniform"; + +@Override +public String name() { +return UNIFORM_ASSIGNOR_NAME; +} + +/** + * Perform the group assignment given the current members and + * topic metadata. + * + * @param assignmentSpecThe member assignment spec. + * @param subscribedTopicDescriber The topic and cluster metadata describer {@link SubscribedTopicDescriber}. + * @return The new assignment for the group. + */ +@Override +public GroupAssignment assign( +AssignmentSpec assignmentSpec, +SubscribedTopicDescriber subscribedTopicDescriber +) throws PartitionAssignorException { + +AbstractAssignmentBuilder assignmentBuilder; +if (allSubscriptionsEqual(assignmentSpec.members())) { +log.debug("Detected that all members are subscribed to the same set of topics, invoking the " ++ "optimized assignment algorithm"); +assignmentBuilder = new OptimizedUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber); +} else { +assignmentBuilder = new GeneralUniformAssignmentBuilder(); +log.debug("Detected that all members are subscribed to a different set of topics, invoking the " ++ "general assignment algorithm"); +} +return assignmentBuilder.buildAssignment(); +} + +/** + * Determines if all members are subscribed to the same list of topic IDs. + * + * @param members A map of member identifiers to their respective {@code AssignmentMemberSpec}. + *Assumes the map is non-empty. + * @return true if all members have the same subscription list of topic IDs, + * false otherwise. + */ +private boolean allSubscriptionsEqual(Map members) { +Set firstSubscriptionSet = new HashSet<>(members.values().iterator().next().subscribedTopicIds()); Review Comment: added check but I don't think we need to throw an illegal state exception -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1315178560 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * Assigns Kafka partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * The order of priority of properties during the assignment will be: balance > rack matching (when applicable) > stickiness. + * + * Here's the step-by-step breakdown of the assignment process: + * + * + * Compute the quotas of partitions for each member based on the total partitions and member count. + * For existing assignments, retain partitions based on the determined quota and member's rack compatibility. + * If a partition's rack mismatches with its member, track it with its prior owner. + * Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions. + * Derive the unassigned partitions by taking the difference between total partitions and the sticky assignments. + * Depending on members needing extra partitions, select members from the potentially unfilled list and add them to the unfilled list. + * Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list. + * If no rack-compatible member is found, revert to the tracked current owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment. + * + */ +public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractAssignmentBuilder { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +private final AssignmentSpec assignmentSpec; +private final SubscribedTopicDescriber subscribedTopicDescriber; +// List of topics subscribed to by all members. +private final List subscriptionList; +private final RackInfo rackInfo; +// Count of members to receive an extra partition beyond the minimum quota, +// to account for the distribution of the remaining partitions. +private int remainingMembersToGetExtraPartition; +// Map of members to the remaining number of partitions needed to meet the minimum quota, +// including members eligible for an extra partition. +private final Map potentiallyUnfilledMembers; +// Members mapped to the remaining number of partitions needed to meet the full quota. +// Full quota = minQuota + one extra partition (if applicable). +private Map unfilledMembers; +private List unassignedPartitions; +private final Map newAssignment; +// Tracks the current owner of each partition when using rack-aware strategy. +// Current refers to the existing assignment. +private final Map currentPartitionOwners; +// Indicates if a rack aware assignment can be done. +// True if racks are defined for both members and partitions. +boolean useRackAwareStrategy; + +OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.assignmentSpec = assignmentSpec; +this.subscribedTopicDescriber = subscribedTopicDescriber; +subscriptionList = new ArrayList<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); + +RackInfo rackInfo = new RackInfo(assignmentSpec,
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1315178473 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * Assigns Kafka partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * The order of priority of properties during the assignment will be: balance > rack matching (when applicable) > stickiness. + * + * Here's the step-by-step breakdown of the assignment process: + * + * + * Compute the quotas of partitions for each member based on the total partitions and member count. + * For existing assignments, retain partitions based on the determined quota and member's rack compatibility. + * If a partition's rack mismatches with its member, track it with its prior owner. + * Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions. + * Derive the unassigned partitions by taking the difference between total partitions and the sticky assignments. + * Depending on members needing extra partitions, select members from the potentially unfilled list and add them to the unfilled list. + * Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list. + * If no rack-compatible member is found, revert to the tracked current owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment. + * + */ +public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractAssignmentBuilder { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +private final AssignmentSpec assignmentSpec; +private final SubscribedTopicDescriber subscribedTopicDescriber; +// List of topics subscribed to by all members. +private final List subscriptionList; +private final RackInfo rackInfo; +// Count of members to receive an extra partition beyond the minimum quota, +// to account for the distribution of the remaining partitions. +private int remainingMembersToGetExtraPartition; +// Map of members to the remaining number of partitions needed to meet the minimum quota, +// including members eligible for an extra partition. +private final Map potentiallyUnfilledMembers; +// Members mapped to the remaining number of partitions needed to meet the full quota. +// Full quota = minQuota + one extra partition (if applicable). +private Map unfilledMembers; +private List unassignedPartitions; +private final Map newAssignment; +// Tracks the current owner of each partition when using rack-aware strategy. +// Current refers to the existing assignment. +private final Map currentPartitionOwners; +// Indicates if a rack aware assignment can be done. +// True if racks are defined for both members and partitions. +boolean useRackAwareStrategy; + +OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.assignmentSpec = assignmentSpec; +this.subscribedTopicDescriber = subscribedTopicDescriber; +subscriptionList = new ArrayList<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); Review Comment: as discussed on call, I added a check in the
[GitHub] [kafka] kamalcph commented on a diff in pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)
kamalcph commented on code in PR #14329: URL: https://github.com/apache/kafka/pull/14329#discussion_r1315134679 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -379,9 +379,9 @@ public void stopPartitions(Set topicPartitions, LOGGER.error("Error while stopping the partition: {}, delete: {}", tpId.topicPartition(), delete, ex); } }); -remoteLogMetadataManager.onStopPartitions(topicIdPartitions); if (delete) { // NOTE: this#stopPartitions method is called when Replica state changes to Offline and ReplicaDeletionStarted +remoteLogMetadataManager.onStopPartitions(topicIdPartitions); Review Comment: yes, you're right. When the replica is moved to another node, then the RLMM#stopPartition won't be called. We can handle this case in TBRLMM by adding a test to ensure that if the same replica is reverted back to the previous node, then it gets handled gracefully. But, we need to handle this case for any RLMM implementation. Another way to fix this issue is that we can always invoke the `RLMM#stopPartition` when `deleteLocalLog` is set to true but that requires good amount of changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)
kamalcph commented on code in PR #14329: URL: https://github.com/apache/kafka/pull/14329#discussion_r1315134679 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -379,9 +379,9 @@ public void stopPartitions(Set topicPartitions, LOGGER.error("Error while stopping the partition: {}, delete: {}", tpId.topicPartition(), delete, ex); } }); -remoteLogMetadataManager.onStopPartitions(topicIdPartitions); if (delete) { // NOTE: this#stopPartitions method is called when Replica state changes to Offline and ReplicaDeletionStarted +remoteLogMetadataManager.onStopPartitions(topicIdPartitions); Review Comment: yes, you're right. When the replica is moved to another node, then the RLMM#stopPartition won't be called. We can handle this case in TBRLMM by adding a test to ensure that if the same replica is reverted back to the previous node, then it gets handled gracefully. But, we need to handle this case for any RLMM implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15388: - Issue Type: Bug (was: Task) > Handle topics that were having compaction as retention earlier are changed to > delete only retention policy and onboarded to tiered storage. > > > Key: KAFKA-15388 > URL: https://issues.apache.org/jira/browse/KAFKA-15388 > Project: Kafka > Issue Type: Bug >Reporter: Satish Duggana >Priority: Major > Fix For: 3.7.0 > > > Context: https://github.com/apache/kafka/pull/13561#discussion_r1300055517 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15420) Kafka Tiered Storage V1
[ https://issues.apache.org/jira/browse/KAFKA-15420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15420: - Labels: KIP-405 (was: ) > Kafka Tiered Storage V1 > --- > > Key: KAFKA-15420 > URL: https://issues.apache.org/jira/browse/KAFKA-15420 > Project: Kafka > Issue Type: Improvement >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > Labels: KIP-405 > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761859#comment-17761859 ] Divij Vaidya commented on KAFKA-15388: -- Hey [~satish.duggana] If supporting TS on historically compacted topics is not in scope for 3.6 (blocked by this ticket) then we should update the [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Tiered+Storage+Early+Access+Release+Notes] with this information. What do you think? Also, note that other that delete, other paths (read) is impacted by it as well. For example, this line of code has an assumption that offsets are contiguous https://github.com/apache/kafka/blob/5785796f985aa294c12e670da221d086a7fa887c/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1326C4-L1326C4 > Handle topics that were having compaction as retention earlier are changed to > delete only retention policy and onboarded to tiered storage. > > > Key: KAFKA-15388 > URL: https://issues.apache.org/jira/browse/KAFKA-15388 > Project: Kafka > Issue Type: Task >Reporter: Satish Duggana >Priority: Major > Fix For: 3.7.0 > > > Context: https://github.com/apache/kafka/pull/13561#discussion_r1300055517 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)
divijvaidya commented on code in PR #14329: URL: https://github.com/apache/kafka/pull/14329#discussion_r1315061717 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -379,9 +379,9 @@ public void stopPartitions(Set topicPartitions, LOGGER.error("Error while stopping the partition: {}, delete: {}", tpId.topicPartition(), delete, ex); } }); -remoteLogMetadataManager.onStopPartitions(topicIdPartitions); if (delete) { // NOTE: this#stopPartitions method is called when Replica state changes to Offline and ReplicaDeletionStarted +remoteLogMetadataManager.onStopPartitions(topicIdPartitions); Review Comment: > Stop partition request will be called when a topic is deleted, partition is moved to another replica and node is stopped. The 4th case in future is going to be when TS is disabled dynamically for a topic (KIP-950). In such a scenario, TBRLMM will probably still have a problem if the disablement has a "retain=delete". We can solve it in scope for KIP-950 but heads-up that some changes in TBRLMM will be required. > In the first 2 cases, the request will have delete flag set to true. When replica is moving to another node, why would `deleteRemoteLog` be true? Yes, the `deleteLocalLog` will be true but not `deleteRemoteLog`. Please help me understand this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)
divijvaidya commented on code in PR #14329: URL: https://github.com/apache/kafka/pull/14329#discussion_r1314961838 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java: ## @@ -438,10 +440,19 @@ private void initializeResources() { lock.writeLock().unlock(); } } +} finally { +if (adminClient != null) { +try { +adminClient.close(Duration.ofSeconds(10)); Review Comment: Use `Utils.closeQuietly` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)
divijvaidya commented on code in PR #14307: URL: https://github.com/apache/kafka/pull/14307#discussion_r1315045974 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public final class EnableRemoteLogOnTopicTest extends TieredStorageTestHarness { + +@Override +public int brokerCount() { +return 2; +} + +@Override +protected void writeTestSpecifications(TieredStorageTestBuilder builder) { +final Integer broker0 = 0; +final Integer broker1 = 1; +final String topicA = "topicA"; +final Integer p0 = 0; +final Integer p1 = 1; +final Integer partitionCount = 2; +final Integer replicationFactor = 2; +final Integer maxBatchCountPerSegment = 1; +final boolean enableRemoteLogStorage = false; +final Map> assignment = mkMap( +mkEntry(p0, Arrays.asList(broker0, broker1)), +mkEntry(p1, Arrays.asList(broker1, broker0)) +); + +builder +.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, assignment, +enableRemoteLogStorage) +// send records to partition 0 +.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 0L) +.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) +// send records to partition 1 +.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 0L) +.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) +// enable remote log storage +.updateTopicConfig(topicA, + Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), +Collections.emptyList()) +// produce some more records to partition 0 +// Note that the segment 0-2 gets offloaded for p0, but we cannot expect those events deterministically +// because the rlm-task-thread runs in background and this framework doesn't support it. +.expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new KeyValueSpec("k3", "v3")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L) +.produce(topicA, p0, new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4")) +// produce some more records to partition 1 +// Note that the segment 0-2 gets offloaded for p1, but we cannot expect those events deterministically +// because the rlm-task-thread runs in background and this framework doesn't support it. +.expectSegmentToBeOffloaded(broker1, topicA, p1, 3, new KeyValueSpec("k3", "v3")) Review Comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15431) Add support to assert offloaded segment for already produced event in Tiered Storage Framework
[ https://issues.apache.org/jira/browse/KAFKA-15431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15431: - Parent: (was: KAFKA-15420) Issue Type: Task (was: Sub-task) > Add support to assert offloaded segment for already produced event in Tiered > Storage Framework > -- > > Key: KAFKA-15431 > URL: https://issues.apache.org/jira/browse/KAFKA-15431 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Priority: Major > > See > [comment|https://github.com/apache/kafka/pull/14307#discussion_r1314943942] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kamalcph commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)
kamalcph commented on code in PR #14307: URL: https://github.com/apache/kafka/pull/14307#discussion_r1315003306 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public final class PartitionsExpandTest extends TieredStorageTestHarness { + +@Override +public int brokerCount() { +return 2; +} + +@Override +protected void writeTestSpecifications(TieredStorageTestBuilder builder) { +final Integer broker0 = 0; +final Integer broker1 = 1; +final String topicA = "topicA"; +final Integer p0 = 0; +final Integer p1 = 1; +final Integer p2 = 2; +final Integer partitionCount = 1; +final Integer replicationFactor = 2; +final Integer maxBatchCountPerSegment = 1; +final boolean enableRemoteLogStorage = true; +final List p0Assignment = Arrays.asList(broker0, broker1); +final List p1Assignment = Arrays.asList(broker0, broker1); +final List p2Assignment = Arrays.asList(broker1, broker0); + +builder +.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, +Collections.singletonMap(p0, p0Assignment), enableRemoteLogStorage) +// produce events to partition 0 +.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0")) +.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L) +.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) +// expand the topicA partition-count to 3 +.createPartitions(topicA, 3, mkMap(mkEntry(p1, p1Assignment), mkEntry(p2, p2Assignment))) +// consume from the beginning of the topic to read data from local and remote storage for partition 0 +.expectFetchFromTieredStorage(broker0, topicA, p0, 2) +.consume(topicA, p0, 0L, 3, 2) + +.expectLeader(topicA, p1, broker0, false) +.expectLeader(topicA, p2, broker1, false) + +// produce events to partition 1 +.expectSegmentToBeOffloaded(broker0, topicA, p1, 0, new KeyValueSpec("k0", "v0")) +.expectSegmentToBeOffloaded(broker0, topicA, p1, 1, new KeyValueSpec("k1", "v1")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 2L) +.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) + +// produce events to partition 2 +.expectSegmentToBeOffloaded(broker1, topicA, p2, 0, new KeyValueSpec("k0", "v0")) +.expectSegmentToBeOffloaded(broker1, topicA, p2, 1, new KeyValueSpec("k1", "v1")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p2, 2L) +.produce(topicA, p2, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) + +// produce some more events to partition 0 and expect the segments to be offloaded +// NOTE: Support needs to be added to capture the offloaded segment event for already sent message (k2, v2) Review Comment: Filed KAFKA-15431 to track this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment.
[jira] [Updated] (KAFKA-15431) Add support to assert offloaded segment for already produced event in Tiered Storage Framework
[ https://issues.apache.org/jira/browse/KAFKA-15431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15431: - Parent: KAFKA-15420 Issue Type: Sub-task (was: Task) > Add support to assert offloaded segment for already produced event in Tiered > Storage Framework > -- > > Key: KAFKA-15431 > URL: https://issues.apache.org/jira/browse/KAFKA-15431 > Project: Kafka > Issue Type: Sub-task >Reporter: Kamal Chandraprakash >Priority: Major > > See > [comment|https://github.com/apache/kafka/pull/14307#discussion_r1314943942] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15431) Add support to assert offloaded segment for already produced event in Tiered Storage Framework
Kamal Chandraprakash created KAFKA-15431: Summary: Add support to assert offloaded segment for already produced event in Tiered Storage Framework Key: KAFKA-15431 URL: https://issues.apache.org/jira/browse/KAFKA-15431 Project: Kafka Issue Type: Task Reporter: Kamal Chandraprakash See [comment|https://github.com/apache/kafka/pull/14307#discussion_r1314943942] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kamalcph commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)
kamalcph commented on code in PR #14307: URL: https://github.com/apache/kafka/pull/14307#discussion_r1314999348 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public final class EnableRemoteLogOnTopicTest extends TieredStorageTestHarness { + +@Override +public int brokerCount() { +return 2; +} + +@Override +protected void writeTestSpecifications(TieredStorageTestBuilder builder) { +final Integer broker0 = 0; +final Integer broker1 = 1; +final String topicA = "topicA"; +final Integer p0 = 0; +final Integer p1 = 1; +final Integer partitionCount = 2; +final Integer replicationFactor = 2; +final Integer maxBatchCountPerSegment = 1; +final boolean enableRemoteLogStorage = false; +final Map> assignment = mkMap( +mkEntry(p0, Arrays.asList(broker0, broker1)), +mkEntry(p1, Arrays.asList(broker1, broker0)) +); + +builder +.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, assignment, +enableRemoteLogStorage) +// send records to partition 0 +.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 0L) +.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) +// send records to partition 1 +.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 0L) +.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) +// enable remote log storage +.updateTopicConfig(topicA, + Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), +Collections.emptyList()) +// produce some more records to partition 0 +// Note that the segment 0-2 gets offloaded for p0, but we cannot expect those events deterministically +// because the rlm-task-thread runs in background and this framework doesn't support it. +.expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new KeyValueSpec("k3", "v3")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L) +.produce(topicA, p0, new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4")) +// produce some more records to partition 1 +// Note that the segment 0-2 gets offloaded for p1, but we cannot expect those events deterministically +// because the rlm-task-thread runs in background and this framework doesn't support it. +.expectSegmentToBeOffloaded(broker1, topicA, p1, 3, new KeyValueSpec("k3", "v3")) Review Comment: The 4th argument is the base-offset of the segment and the 5th argument is the contents of that segment. The test asserts that the segment contains only the expected messages in it. It is a `varargs` argument. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)
kamalcph commented on code in PR #14307: URL: https://github.com/apache/kafka/pull/14307#discussion_r1314999348 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public final class EnableRemoteLogOnTopicTest extends TieredStorageTestHarness { + +@Override +public int brokerCount() { +return 2; +} + +@Override +protected void writeTestSpecifications(TieredStorageTestBuilder builder) { +final Integer broker0 = 0; +final Integer broker1 = 1; +final String topicA = "topicA"; +final Integer p0 = 0; +final Integer p1 = 1; +final Integer partitionCount = 2; +final Integer replicationFactor = 2; +final Integer maxBatchCountPerSegment = 1; +final boolean enableRemoteLogStorage = false; +final Map> assignment = mkMap( +mkEntry(p0, Arrays.asList(broker0, broker1)), +mkEntry(p1, Arrays.asList(broker1, broker0)) +); + +builder +.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, assignment, +enableRemoteLogStorage) +// send records to partition 0 +.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 0L) +.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) +// send records to partition 1 +.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 0L) +.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) +// enable remote log storage +.updateTopicConfig(topicA, + Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), +Collections.emptyList()) +// produce some more records to partition 0 +// Note that the segment 0-2 gets offloaded for p0, but we cannot expect those events deterministically +// because the rlm-task-thread runs in background and this framework doesn't support it. +.expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new KeyValueSpec("k3", "v3")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L) +.produce(topicA, p0, new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4")) +// produce some more records to partition 1 +// Note that the segment 0-2 gets offloaded for p1, but we cannot expect those events deterministically +// because the rlm-task-thread runs in background and this framework doesn't support it. +.expectSegmentToBeOffloaded(broker1, topicA, p1, 3, new KeyValueSpec("k3", "v3")) Review Comment: The 4th parameter is the base-offset of the segment and the 5th parameter is the contents of that segment. The test asserts that the segment contains only the expected messages in it. It is a `varargs` argument. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14271: KAFKA-14503: Implement ListGroups
dajac commented on code in PR #14271: URL: https://github.com/apache/kafka/pull/14271#discussion_r1314944364 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -306,6 +308,19 @@ public CoordinatorResult commitOffset( return offsetMetadataManager.commitOffset(context, request); } +/** + * Handles a ListGroups request. + * + * @param statesFilter The states of the groups we want to list. If empty all groups are returned with their state. + * @return A Result containing the ListGroupsResponseData response + */ +public ListGroupsResponseData listGroups( +List statesFilter, +long committedOffset +) throws ApiException { +return new ListGroupsResponseData().setGroups(groupMetadataManager.listGroups(statesFilter, committedOffset)); +} Review Comment: I sill think that we should rather return the list of groups here and create `ListGroupsResponseData` one level up. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -428,9 +432,51 @@ public CompletableFuture listGroups( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +List> futures = new ArrayList<>(); +for (TopicPartition tp : runtime.partitions()) { +futures.add(runtime.scheduleReadOperation( +"list-groups", +tp, +(coordinator, lastCommittedOffset) -> coordinator.listGroups(request.statesFilter(), lastCommittedOffset) +).exceptionally(exception -> { +if (!(exception instanceof KafkaException)) { +log.error("ListGroups request {} hit an unexpected exception: {}", +request, exception.getMessage()); +throw new RuntimeException(exception); +} +if (exception instanceof CoordinatorLoadInProgressException) { +throw new RuntimeException(exception); +} else if (exception instanceof NotCoordinatorException) { +log.warn("ListGroups request {} hit a NotCoordinatorException exception: {}", +request, exception.getMessage()); +return new ListGroupsResponseData().setGroups(Collections.emptyList()); +} else { +return new ListGroupsResponseData().setErrorCode(Errors.forException(exception).code()); +} +})); +} +CompletableFuture responseFuture = new CompletableFuture<>(); +List listedGroups = new ArrayList<>(); +AtomicInteger succeedFutureCount = new AtomicInteger(); +FutureUtils.drainFutures(futures, (data, t) -> { +synchronized (runtime) { +if (t != null) { +responseFuture.completeExceptionally(new UnknownServerException(t.getMessage())); +} else { +if (data.errorCode() != Errors.NONE.code()) { +if (!responseFuture.isDone()) { +responseFuture.complete(data); +} +} else { +listedGroups.addAll(data.groups()); +if (succeedFutureCount.addAndGet(1) == runtime.partitions().size()) { +responseFuture.complete(new ListGroupsResponseData().setGroups(listedGroups)); +} +} +} +} +}); +return responseFuture; Review Comment: There are a few issues with this code. 1. Synchronising on `runtime` will create lock contention across all the callers of `listGroups`. We should rather use a local variable. 2. The error handling seems error prone to me. For instance, `NotCoordinatorException` exceptions are turned into `RuntimeException` exceptions and then turned into `UnknownServerException` if I understood it correctly. We lose the semantic along the way. I think that we could take your idea further and combine the two main steps into one. I am thinking about something like this: ``` final List partitions = new ArrayList<>(runtime.partitions()); final CompletableFuture future = new CompletableFuture<>(); final List results = new ArrayList<>(); final AtomicInteger cnt = new AtomicInteger(partitions.size()); for (TopicPartition partition : partitions) { runtime.scheduleReadOperation( "list-group", partition, (coordinator, lastCommittedOffset) ->
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)
divijvaidya commented on code in PR #14307: URL: https://github.com/apache/kafka/pull/14307#discussion_r1314943942 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public final class PartitionsExpandTest extends TieredStorageTestHarness { + +@Override +public int brokerCount() { +return 2; +} + +@Override +protected void writeTestSpecifications(TieredStorageTestBuilder builder) { +final Integer broker0 = 0; +final Integer broker1 = 1; +final String topicA = "topicA"; +final Integer p0 = 0; +final Integer p1 = 1; +final Integer p2 = 2; +final Integer partitionCount = 1; +final Integer replicationFactor = 2; +final Integer maxBatchCountPerSegment = 1; +final boolean enableRemoteLogStorage = true; +final List p0Assignment = Arrays.asList(broker0, broker1); +final List p1Assignment = Arrays.asList(broker0, broker1); +final List p2Assignment = Arrays.asList(broker1, broker0); + +builder +.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, +Collections.singletonMap(p0, p0Assignment), enableRemoteLogStorage) +// produce events to partition 0 +.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0")) +.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L) +.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) +// expand the topicA partition-count to 3 +.createPartitions(topicA, 3, mkMap(mkEntry(p1, p1Assignment), mkEntry(p2, p2Assignment))) +// consume from the beginning of the topic to read data from local and remote storage for partition 0 +.expectFetchFromTieredStorage(broker0, topicA, p0, 2) +.consume(topicA, p0, 0L, 3, 2) + +.expectLeader(topicA, p1, broker0, false) +.expectLeader(topicA, p2, broker1, false) + +// produce events to partition 1 +.expectSegmentToBeOffloaded(broker0, topicA, p1, 0, new KeyValueSpec("k0", "v0")) +.expectSegmentToBeOffloaded(broker0, topicA, p1, 1, new KeyValueSpec("k1", "v1")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 2L) +.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) + +// produce events to partition 2 +.expectSegmentToBeOffloaded(broker1, topicA, p2, 0, new KeyValueSpec("k0", "v0")) +.expectSegmentToBeOffloaded(broker1, topicA, p2, 1, new KeyValueSpec("k1", "v1")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p2, 2L) +.produce(topicA, p2, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) + +// produce some more events to partition 0 and expect the segments to be offloaded +// NOTE: Support needs to be added to capture the offloaded segment event for already sent message (k2, v2) Review Comment: we risk forgetting this commented line. please add a JIRA ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java: ## @@ -0,0 +1,89 @@
[GitHub] [kafka] dopuskh3 closed pull request #14332: [MINOR] Fix TopicPartition comparison
dopuskh3 closed pull request #14332: [MINOR] Fix TopicPartition comparison URL: https://github.com/apache/kafka/pull/14332 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15430) Kafla create replca partition on controller node
[ https://issues.apache.org/jira/browse/KAFKA-15430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrii Vysotskiy updated KAFKA-15430: - Priority: Major (was: Minor) > Kafla create replca partition on controller node > > > Key: KAFKA-15430 > URL: https://issues.apache.org/jira/browse/KAFKA-15430 > Project: Kafka > Issue Type: Test > Components: kraft >Affects Versions: 3.5.1 >Reporter: Andrii Vysotskiy >Priority: Major > > I have configuration 5 nodes (KRAFT mode), with next roles: 4 > broker+controller and 1 controller. Create topic with replication factor 5, > and it is created, and describe show that topic partition have 5 replicas. > > {{/opt/kafka/latest/bin/kafka-topics.sh --create > --bootstrap-server=dc1-prod-kafka-001-vs:9092 --replication-factor 5 > --partitions 1 --topic test5}} > > /opt/kafka/latest/bin/kafka-topics.sh --describe --topic test5 > --bootstrap-server=dc1-prod-kafka-001-vs:9092 > Topic: test5 TopicId: amuqr8EgRmqeKryUHZwsMA PartitionCount: 1 > ReplicationFactor: 5 Configs: segment.bytes=1073741824 > Topic: test5 Partition: 0 Leader: 3 Replicas: 3,4,1,2,5 Isr: 3,4,1,2}} > > Replicas 5 and ISR 4. Why does kafka initially allow you to create a replica > on the controller node, although in reality the replica is not created on the > controller node and there are no topic files in the log directory. > Is this expected behavior or not? Thanks. > I want to understand whether such behavior is the norm for Kafka > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dengziming commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers
dengziming commented on code in PR #14306: URL: https://github.com/apache/kafka/pull/14306#discussion_r1314913328 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -320,6 +323,19 @@ public short registerBrokerRecordVersion() { } } +public short registerControllerRecordVersion() { +if (isAtLeast(MetadataVersion.IBP_3_6_IV2)) { Review Comment: We are not using `IBP_3_7_IV0`, does this mean a old-version controller can register a controller with an new-version controller? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #14332: [MINOR] Fix TopicPartition comparison
satishd commented on PR #14332: URL: https://github.com/apache/kafka/pull/14332#issuecomment-1705221588 Thanks @dopuskh3 for catching and raising the PR. As @showuon mentioned, it is addressed by [PR](https://github.com/apache/kafka/pull/14307/files#diff-380e4d8859ea9148f21794c09039425c82d9012a392c2dbbe1ce2ec8677a1970R304) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)
kamalcph commented on code in PR #14307: URL: https://github.com/apache/kafka/pull/14307#discussion_r1314900757 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; + +public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness { + +@Override +public int brokerCount() { +return 1; +} + +@Override +protected void writeTestSpecifications(TieredStorageTestBuilder builder) { +final Integer broker0 = 0; +final String topicA = "topicA"; +final Integer p0 = 0; +final Integer partitionCount = 1; +final Integer replicationFactor = 1; +final Integer maxBatchCountPerSegment = 1; +final Map> replicaAssignment = null; +final boolean enableRemoteLogStorage = true; +final int beginEpoch = 0; +final long startOffset = 3; + +// Create topicA with 1 partition, 1 RF and enabled with remote storage. +builder.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, replicaAssignment, +enableRemoteLogStorage) +// produce events to partition 0 and expect 3 segments to be offloaded +.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0")) +.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1")) +.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L) +.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3")) +// update the topic config such that it triggers the deletion of segments +.updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList()) Review Comment: default retention time is 7 days and retention bytes is unlimited so the config update is required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dopuskh3 commented on pull request #14332: [MINOR] Fix TopicPartition comparison
dopuskh3 commented on PR #14332: URL: https://github.com/apache/kafka/pull/14332#issuecomment-1705208040 cc: @satishd @jeqo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dopuskh3 opened a new pull request, #14332: [MINOR] Fix TopicPartition comparison
dopuskh3 opened a new pull request, #14332: URL: https://github.com/apache/kafka/pull/14332 This only affect logging -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)
clolov commented on code in PR #14307: URL: https://github.com/apache/kafka/pull/14307#discussion_r1314892554 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; + +public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness { + +@Override +public int brokerCount() { +return 1; +} + +@Override +protected void writeTestSpecifications(TieredStorageTestBuilder builder) { +final Integer broker0 = 0; +final String topicA = "topicA"; +final Integer p0 = 0; +final Integer partitionCount = 1; +final Integer replicationFactor = 1; +final Integer maxBatchCountPerSegment = 1; +final Map> replicaAssignment = null; +final boolean enableRemoteLogStorage = true; +final int beginEpoch = 0; +final long startOffset = 3; + +// Create topicA with 1 partition, 1 RF and enabled with remote storage. +builder.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, replicaAssignment, +enableRemoteLogStorage) +// produce events to partition 0 and expect 3 segments to be offloaded +.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0")) +.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1")) +.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L) +.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3")) +// update the topic config such that it triggers the deletion of segments +.updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList()) Review Comment: For my understanding, why do we need this configuration update? How would it trigger the segment deletion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14328: KAFKA-15410: Reassign replica expand, move and shrink integration tests (2/4)
kamalcph commented on code in PR #14328: URL: https://github.com/apache/kafka/pull/14328#discussion_r1314897616 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaShrinkTest.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public final class ReassignReplicaShrinkTest extends TieredStorageTestHarness { + +/** + * Cluster of two brokers + * @return number of brokers in the cluster + */ +@Override +public int brokerCount() { +return 2; +} + +/** + * Number of partitions in the '__remote_log_metadata' topic + * @return number of partitions in the '__remote_log_metadata' topic + */ +@Override +public int numRemoteLogMetadataPartitions() { +return 2; +} + +@Override +protected void writeTestSpecifications(TieredStorageTestBuilder builder) { +final Integer broker0 = 0; +final Integer broker1 = 1; +final String topicA = "topicA"; +final Integer p0 = 0; +final Integer p1 = 1; +final Integer partitionCount = 2; +final Integer replicationFactor = 2; +final Integer maxBatchCountPerSegment = 1; +final boolean enableRemoteLogStorage = true; +final Map> replicaAssignment = mkMap( +mkEntry(p0, Arrays.asList(broker0, broker1)), +mkEntry(p1, Arrays.asList(broker1, broker0)) +); + +builder +// create topicA with 2 partitions and 2 RF +.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, +replicaAssignment, enableRemoteLogStorage) +// send records to partition 0, expect that the segments are uploaded and removed from local log dir +.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0")) +.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L) +.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) +// send records to partition 1, expect that the segments are uploaded and removed from local log dir +.expectSegmentToBeOffloaded(broker1, topicA, p1, 0, new KeyValueSpec("k0", "v0")) +.expectSegmentToBeOffloaded(broker1, topicA, p1, 1, new KeyValueSpec("k1", "v1")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p1, 2L) +.produce(topicA, p1, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) +// shrink the replication factor to 1 +.shrinkReplica(topicA, p0, Collections.singletonList(broker1)) +.shrinkReplica(topicA, p1, Collections.singletonList(broker0)) +.expectLeader(topicA, p0, broker1, false) Review Comment: yes, we are asserting the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14328: KAFKA-15410: Reassign replica expand, move and shrink integration tests (2/4)
kamalcph commented on code in PR #14328: URL: https://github.com/apache/kafka/pull/14328#discussion_r1314896591 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/ReassignReplicaExpandTest.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public class ReassignReplicaExpandTest extends TieredStorageTestHarness { + +protected final Integer broker0 = 0; +protected final Integer broker1 = 1; + +/** + * Cluster of two brokers + * @return number of brokers in the cluster + */ +@Override +public int brokerCount() { +return 2; +} + +/** + * Number of partitions in the '__remote_log_metadata' topic + * @return number of partitions in the '__remote_log_metadata' topic + */ +@Override +public int numRemoteLogMetadataPartitions() { +return 2; +} + +@Override +protected void writeTestSpecifications(TieredStorageTestBuilder builder) { +final String topicA = "topicA"; +final String topicB = "topicB"; +final Integer p0 = 0; +final Integer partitionCount = 5; +final Integer replicationFactor = 2; +final Integer maxBatchCountPerSegment = 1; +final Map> replicaAssignment = null; +final boolean enableRemoteLogStorage = true; +final List metadataPartitions = new ArrayList<>(); +for (int i = 0; i < numRemoteLogMetadataPartitions(); i++) { +metadataPartitions.add(i); +} + +builder +// create topicA with 5 partitions and 2 RF +.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, Review Comment: yes correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-14133: -- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875a}KStreamPrintTest{color} (owner: Christo) # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875a}ClientUtilsTest{color} (owner: Christo) # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}TopologyTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew) # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in https://issues.apache.org/jira/browse/KAFKA-12947) # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: [~shekharrajak]) # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) [https://github.com/apache/kafka/pull/12777] # {color:#00875a}AbstractStreamTest{color} (owner: Christo) # {color:#00875a}KStreamTransformValuesTest{color} (owner: Christo) # {color:#00875a}KTableImplTest{color} (owner: Christo) # {color:#00875a}KTableTransformValuesTest{color} (owner: Christo) # {color:#00875a}SessionCacheFlushListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedCacheFlushListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedTupleForwarderTest{color} (owner: Christo) # {color:#00875a}ActiveTaskCreatorTest{color} (owner: Christo) # {color:#00875a}ChangelogTopicsTest{color} (owner: Christo) # {color:#00875a}GlobalProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}RecordCollectorTest{color} (owner: Christo) # {color:#00875a}StateRestoreCallbackAdapterTest{color} (owner: Christo) #
[GitHub] [kafka] showuon commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers
showuon commented on code in PR #14306: URL: https://github.com/apache/kafka/pull/14306#discussion_r1314886326 ## clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationRequest.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ControllerRegistrationRequestData; +import org.apache.kafka.common.message.ControllerRegistrationResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class ControllerRegistrationRequest extends AbstractRequest { +public static class Builder extends AbstractRequest.Builder { +private final ControllerRegistrationRequestData data; + +public Builder(ControllerRegistrationRequestData data) { +super(ApiKeys.BROKER_HEARTBEAT); Review Comment: Nice catch, @dengziming ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers
dengziming commented on code in PR #14306: URL: https://github.com/apache/kafka/pull/14306#discussion_r1314864521 ## clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationRequest.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ControllerRegistrationRequestData; +import org.apache.kafka.common.message.ControllerRegistrationResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class ControllerRegistrationRequest extends AbstractRequest { +public static class Builder extends AbstractRequest.Builder { +private final ControllerRegistrationRequestData data; + +public Builder(ControllerRegistrationRequestData data) { +super(ApiKeys.BROKER_HEARTBEAT); Review Comment: this should be CONTROLLER_REGISTRATION ## clients/src/main/resources/common/message/ControllerRegistrationRequest.json: ## @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 70, Review Comment: we don't have 69, why use 70? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15293) Update metrics doc to add tiered storage metrics
[ https://issues.apache.org/jira/browse/KAFKA-15293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15293: --- Component/s: documentation > Update metrics doc to add tiered storage metrics > > > Key: KAFKA-15293 > URL: https://issues.apache.org/jira/browse/KAFKA-15293 > Project: Kafka > Issue Type: Sub-task > Components: documentation >Reporter: Abhijeet Kumar >Assignee: Abhijeet Kumar >Priority: Critical > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761806#comment-17761806 ] Satish Duggana commented on KAFKA-13421: Moving it to 3.7.0 as there is not much activity on this issues for the last few weeks. > Fix > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > - > > Key: KAFKA-13421 > URL: https://issues.apache.org/jira/browse/KAFKA-13421 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Philip Nee >Priority: Blocker > Labels: clients, consumer, flaky-test, unit-test > Fix For: 3.6.0 > > > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > is failing with this error: > {code} > ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() > failed, log available in > /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout > > > ConsumerBounceTest > > testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() > FAILED > org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode > = NodeExists > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:126) > > at > kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904) > > at > kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842) > at > kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809) > at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96) > at kafka.server.KafkaServer.startup(KafkaServer.scala:320) > at > kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2 > 12) > at > scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) > at scala.collection.AbstractIterable.foreach(Iterable.scala:919) > at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889) > at > kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB > igGroup$1(ConsumerBounceTest.scala:327) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) > at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C > onsumerBounceTest.scala:319) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-13421: --- Fix Version/s: 3.7.0 (was: 3.6.0) > Fix > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > - > > Key: KAFKA-13421 > URL: https://issues.apache.org/jira/browse/KAFKA-13421 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Philip Nee >Priority: Blocker > Labels: clients, consumer, flaky-test, unit-test > Fix For: 3.7.0 > > > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > is failing with this error: > {code} > ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() > failed, log available in > /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout > > > ConsumerBounceTest > > testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() > FAILED > org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode > = NodeExists > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:126) > > at > kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904) > > at > kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842) > at > kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809) > at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96) > at kafka.server.KafkaServer.startup(KafkaServer.scala:320) > at > kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2 > 12) > at > scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) > at scala.collection.AbstractIterable.foreach(Iterable.scala:919) > at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889) > at > kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB > igGroup$1(ConsumerBounceTest.scala:327) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) > at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C > onsumerBounceTest.scala:319) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)
showuon commented on PR #14307: URL: https://github.com/apache/kafka/pull/14307#issuecomment-1705153637 Will wait for @clolov 's review before merging it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14331: Add documentation for tiered storage metrics
divijvaidya commented on code in PR #14331: URL: https://github.com/apache/kafka/pull/14331#discussion_r1314856715 ## docs/ops.html: ## @@ -1545,6 +1545,51 @@ https://github.com/apache/kafka/blob/d34d84dbef20559e68c899315a0915e9dd740cb0/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java#L64 ## docs/ops.html: ## @@ -1545,6 +1545,51 @@
[GitHub] [kafka] satishd commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)
satishd commented on code in PR #14307: URL: https://github.com/apache/kafka/pull/14307#discussion_r1314858032 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public final class PartitionsExpandTest extends TieredStorageTestHarness { + +@Override +public int brokerCount() { +return 2; +} + +@Override +protected void writeTestSpecifications(TieredStorageTestBuilder builder) { +final Integer broker0 = 0; +final Integer broker1 = 1; +final String topicA = "topicA"; +final Integer p0 = 0; +final Integer p1 = 1; +final Integer p2 = 2; +final Integer partitionCount = 1; +final Integer replicationFactor = 2; +final Integer maxBatchCountPerSegment = 1; +final boolean enableRemoteLogStorage = true; +final List p0Assignment = Arrays.asList(broker0, broker1); +final List p1Assignment = Arrays.asList(broker0, broker1); +final List p2Assignment = Arrays.asList(broker1, broker0); + +builder +.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, +Collections.singletonMap(p0, p0Assignment), enableRemoteLogStorage) +// produce events to partition 0 +.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0")) +.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L) +.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) +// expand the topicA partition-count to 3 +.createPartitions(topicA, 3, mkMap(mkEntry(p1, p1Assignment), mkEntry(p2, p2Assignment))) +// consume from the beginning of the topic to read data from local and remote storage for partition 0 +.expectFetchFromTieredStorage(broker0, topicA, p0, 2) +.consume(topicA, p0, 0L, 3, 2) + +.expectLeader(topicA, p1, broker0, false) +.expectLeader(topicA, p2, broker1, false) + +// produce events to partition 1 +.expectSegmentToBeOffloaded(broker0, topicA, p1, 0, new KeyValueSpec("k0", "v0")) Review Comment: Sure, we can look into that later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #14152: KAFKA-7438: Migrate WindowStoreBuilderTest from EasyMock to Mockito
yashmayya commented on PR #14152: URL: https://github.com/apache/kafka/pull/14152#issuecomment-1705144816 Thanks Divij and no worries, this was pretty low priority! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)
kamalcph commented on code in PR #14307: URL: https://github.com/apache/kafka/pull/14307#discussion_r1314851919 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public final class PartitionsExpandTest extends TieredStorageTestHarness { + +@Override +public int brokerCount() { +return 2; +} + +@Override +protected void writeTestSpecifications(TieredStorageTestBuilder builder) { +final Integer broker0 = 0; +final Integer broker1 = 1; +final String topicA = "topicA"; +final Integer p0 = 0; +final Integer p1 = 1; +final Integer p2 = 2; +final Integer partitionCount = 1; +final Integer replicationFactor = 2; +final Integer maxBatchCountPerSegment = 1; +final boolean enableRemoteLogStorage = true; +final List p0Assignment = Arrays.asList(broker0, broker1); +final List p1Assignment = Arrays.asList(broker0, broker1); +final List p2Assignment = Arrays.asList(broker1, broker0); + +builder +.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, +Collections.singletonMap(p0, p0Assignment), enableRemoteLogStorage) +// produce events to partition 0 +.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0")) +.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L) +.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2")) +// expand the topicA partition-count to 3 +.createPartitions(topicA, 3, mkMap(mkEntry(p1, p1Assignment), mkEntry(p2, p2Assignment))) +// consume from the beginning of the topic to read data from local and remote storage for partition 0 +.expectFetchFromTieredStorage(broker0, topicA, p0, 2) +.consume(topicA, p0, 0L, 3, 2) + +.expectLeader(topicA, p1, broker0, false) +.expectLeader(topicA, p2, broker1, false) + +// produce events to partition 1 +.expectSegmentToBeOffloaded(broker0, topicA, p1, 0, new KeyValueSpec("k0", "v0")) Review Comment: Prefer to keep the format as it is. We can refactor it later if required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya merged pull request #14152: KAFKA-7438: Migrate WindowStoreBuilderTest from EasyMock to Mockito
divijvaidya merged PR #14152: URL: https://github.com/apache/kafka/pull/14152 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #14152: KAFKA-7438: Migrate WindowStoreBuilderTest from EasyMock to Mockito
divijvaidya commented on PR #14152: URL: https://github.com/apache/kafka/pull/14152#issuecomment-1705136079 I have verified that that the test passes in CI https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14152/1/testReport/org.apache.kafka.streams.state.internals/WindowStoreBuilderTest/ and rest of the failures are flaky/unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14331: Add documentation for tiered storage metrics
abhijeetk88 commented on code in PR #14331: URL: https://github.com/apache/kafka/pull/14331#discussion_r1314820039 ## docs/ops.html: ## @@ -1545,6 +1545,51 @@
[GitHub] [kafka] satishd commented on a diff in pull request #14331: Add documentation for tiered storage metrics
satishd commented on code in PR #14331: URL: https://github.com/apache/kafka/pull/14331#discussion_r1314813270 ## docs/ops.html: ## @@ -1545,6 +1545,51 @@
[GitHub] [kafka] divijvaidya merged pull request #14223: KAFKA-14133: Move AbstractStreamTest and RocksDBMetricsRecordingTriggerTest to Mockito
divijvaidya merged PR #14223: URL: https://github.com/apache/kafka/pull/14223 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15352) Ensure consistency while deleting the remote log segments
[ https://issues.apache.org/jira/browse/KAFKA-15352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15352: - Description: In Kafka-14888, the remote log segments are deleted which breaches the retention time/size before updating the log-start-offset. In middle of deletion, if the consumer starts to read from the beginning of the topic, then it will fail to read the messages and UNKNOWN_SERVER_ERROR will be thrown back to the consumer. To ensure consistency, similar to local log segments where the actual segments are deleted after {{{}segment.delete.delay.ms{}}}, we should update the log-start-offset first before deleting the remote log segment. See the [PR#13561|https://github.com/apache/kafka/pull/13561] and [comment|https://github.com/apache/kafka/pull/13561#discussion_r1293086543] for more details. Case-2: The old-leader (follower) can delete the remote log segment in the middle of leader election. We need to update the log-start-offset metadata for this case. See this comment [https://github.com/apache/kafka/pull/13561#discussion_r1293081560] was: In Kafka-14888, the remote log segments are deleted which breaches the retention time/size before updating the log-start-offset. In middle of deletion, if the consumer starts to read from the beginning of the topic, then it will fail to read the messages and UNKNOWN_SERVER_ERROR will be thrown back to the consumer. To ensure consistency, similar to local log segments where the actual segments are deleted after {{segment.delete.delay.ms}}, we should update the log-start-offset first before deleting the remote log segment. See the [PR#13561|https://github.com/apache/kafka/pull/13561] and [comment|https://github.com/apache/kafka/pull/13561#discussion_r1293086543] for more details. > Ensure consistency while deleting the remote log segments > - > > Key: KAFKA-15352 > URL: https://issues.apache.org/jira/browse/KAFKA-15352 > Project: Kafka > Issue Type: Sub-task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Blocker > Fix For: 3.6.0 > > > In Kafka-14888, the remote log segments are deleted which breaches the > retention time/size before updating the log-start-offset. In middle of > deletion, if the consumer starts to read from the beginning of the topic, > then it will fail to read the messages and UNKNOWN_SERVER_ERROR will be > thrown back to the consumer. > To ensure consistency, similar to local log segments where the actual > segments are deleted after {{{}segment.delete.delay.ms{}}}, we should update > the log-start-offset first before deleting the remote log segment. > See the [PR#13561|https://github.com/apache/kafka/pull/13561] and > [comment|https://github.com/apache/kafka/pull/13561#discussion_r1293086543] > for more details. > Case-2: > The old-leader (follower) can delete the remote log segment in the middle of > leader election. We need to update the log-start-offset metadata for this > case. > See this comment > [https://github.com/apache/kafka/pull/13561#discussion_r1293081560] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15351) Update log-start-offset after leader election for topics enabled with remote storage
[ https://issues.apache.org/jira/browse/KAFKA-15351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15351: - Description: Case-1: In the FETCH response, the leader-log-start-offset will be piggy-backed. But, there can be a scenario: # Leader deleted the remote log segment and updates it's log-start-offset # Before the replica-2 update it's log-start-offset via FETCH-request, the leadership changed to replica-2. # There are no more eligible segments to delete from remote. # The log-start-offset will be stale (referring to old log-start-offset but the data was already removed from remote) # If the consumer starts to read from the beginning of the topic, it will fail to read. See this comment [https://github.com/apache/kafka/pull/13561#discussion_r1293081560] for more details. Case-3: When tiered storage is enabled on the topic, and the last-standing-replica is restarted, then the log-start-offset should be updated upto log-start-offset-checkpoint offset. was: Case-1: In the FETCH response, the leader-log-start-offset will be piggy-backed. But, there can be a scenario: # Leader deleted the remote log segment and updates it's log-start-offset # Before the replica-2 update it's log-start-offset via FETCH-request, the leadership changed to replica-2. # There are no more eligible segments to delete from remote. # The log-start-offset will be stale (referring to old log-start-offset but the data was already removed from remote) # If the consumer starts to read from the beginning of the topic, it will fail to read. See this comment [https://github.com/apache/kafka/pull/13561#discussion_r1293081560] for more details. Case-2: The old-leader (follower) can delete the remote log segment in the middle of leader election. We need to update the log-start-offset metadata for this case. Case-3: When tiered storage is enabled on the topic, and the last-standing-replica is restarted, then the log-start-offset should be updated upto log-start-offset-checkpoint offset. > Update log-start-offset after leader election for topics enabled with remote > storage > > > Key: KAFKA-15351 > URL: https://issues.apache.org/jira/browse/KAFKA-15351 > Project: Kafka > Issue Type: Sub-task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Blocker > Fix For: 3.6.0 > > > Case-1: > In the FETCH response, the leader-log-start-offset will be piggy-backed. But, > there can be a scenario: > # Leader deleted the remote log segment and updates it's log-start-offset > # Before the replica-2 update it's log-start-offset via FETCH-request, the > leadership changed to replica-2. > # There are no more eligible segments to delete from remote. > # The log-start-offset will be stale (referring to old log-start-offset but > the data was already removed from remote) > # If the consumer starts to read from the beginning of the topic, it will > fail to read. > See this comment > [https://github.com/apache/kafka/pull/13561#discussion_r1293081560] for more > details. > Case-3: > When tiered storage is enabled on the topic, and the last-standing-replica is > restarted, then the log-start-offset should be updated upto > log-start-offset-checkpoint offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kamalcph commented on a diff in pull request #14330: KAFKA-15410: Delete records with tiered storage integration test (4/4)
kamalcph commented on code in PR #14330: URL: https://github.com/apache/kafka/pull/14330#discussion_r1314697994 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1066,50 +1066,53 @@ public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata // Check for out of bound epochs between segment epochs and current leader epochs. Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey(); Integer segmentLastEpoch = segmentLeaderEpochs.lastKey(); + +// FIXME: We have to remove the below check too for `DELETE_RECORDS` API to work properly. Review Comment: Not clear on the usage of `isRemoteSegmentWithinLeaderEpochs` validation in the segment deletion path. Commented out the code for now, will go through the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14322: KAFKA-15418: update statement on decompression
divijvaidya commented on code in PR #14322: URL: https://github.com/apache/kafka/pull/14322#discussion_r1314750361 ## docs/design.html: ## @@ -136,8 +136,10 @@ -Kafka supports this with an efficient batching format. A batch of messages can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will -remain compressed in the log and will only be decompressed by the consumer. +Kafka supports this with an efficient batching format. A batch of messages can be grouped together, compressed, and sent to the server in this form. The broker decompresses the batch in order to validate it. For +example, it validates that the number of records in the batch is same as what batch header states. The broker may also potentially modify the batch (e.g., if the topic is compacted, the broker will filter out Review Comment: I just realised another thing. "if the topic is compacted, the broker will filter out records eligible for compaction prior to writing to disk" Are you referring to the fact that records written to a compacted topic need to necessarily have a non-null key else they will be rejected? If yes, then perhaps, we need to phrase it differently. Let me get back to you with a suggestion here in a couple of hours. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14936) Add Grace Period To Stream Table Join
[ https://issues.apache.org/jira/browse/KAFKA-14936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761766#comment-17761766 ] Satish Duggana commented on KAFKA-14936: [~wcarlson5] Is this completed for 3.6.0? If yes, please close the JIRA by updating the respective fix version fields. > Add Grace Period To Stream Table Join > - > > Key: KAFKA-14936 > URL: https://issues.apache.org/jira/browse/KAFKA-14936 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Labels: kip, streams > > Include the grace period for stream table joins as described in kip 923. > Also add a rocksDB time based queueing implementation of > `TimeOrderedKeyValueBuffer` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kamalcph commented on a diff in pull request #14330: KAFKA-15410: Delete records with tiered storage integration test (4/4)
kamalcph commented on code in PR #14330: URL: https://github.com/apache/kafka/pull/14330#discussion_r1314697994 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1066,50 +1066,53 @@ public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata // Check for out of bound epochs between segment epochs and current leader epochs. Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey(); Integer segmentLastEpoch = segmentLeaderEpochs.lastKey(); + +// FIXME: We have to remove the below check too for `DELETE_RECORDS` API to work properly. Review Comment: Not clear on the usage of `isRemoteSegmentWithinLeaderEpochs` validation in the segment deletion path. Commented out the code for now, will go through the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor
[ https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761763#comment-17761763 ] Satish Duggana commented on KAFKA-12473: Removing the KIP from 3.6.0 release plan as it does not seem to be completed. > Make the "cooperative-sticky, range" as the default assignor > > > Key: KAFKA-12473 > URL: https://issues.apache.org/jira/browse/KAFKA-12473 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Critical > Labels: kip > > Now that 3.0 is coming up, we can change the default > ConsumerPartitionAssignor to something better than the RangeAssignor. The > original plan was to switch over to the StickyAssignor, but now that we have > incremental cooperative rebalancing we should consider using the new > CooperativeStickyAssignor instead: this will enable the consumer group to > follow the COOPERATIVE protocol, improving the rebalancing experience OOTB. > KIP: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kamalcph commented on a diff in pull request #14330: KAFKA-15410: Delete records with tiered storage integration test (4/4)
kamalcph commented on code in PR #14330: URL: https://github.com/apache/kafka/pull/14330#discussion_r1314695377 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1066,50 +1066,53 @@ public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata // Check for out of bound epochs between segment epochs and current leader epochs. Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey(); Integer segmentLastEpoch = segmentLeaderEpochs.lastKey(); + +// FIXME: We have to remove the below check too for `DELETE_RECORDS` API to work properly. if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > leaderEpochs.lastKey()) { -LOGGER.debug("[{}] Remote segment {} is not within the partition leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", -segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs); +LOGGER.debug("Segment {} is not within the partition leader epoch lineage. " + +"Remote segment epochs: {} and partition leader epochs: {}", +segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs); return false; } for (Map.Entry entry : segmentLeaderEpochs.entrySet()) { int epoch = entry.getKey(); -long offset = entry.getValue(); +//long offset = entry.getValue(); // If segment's epoch does not exist in the leader epoch lineage then it is not a valid segment. if (!leaderEpochs.containsKey(epoch)) { -LOGGER.debug("[{}] Remote segment {}'s epoch {} is not within the leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", -segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs); +LOGGER.debug("Segment {} epoch {} is not within the leader epoch lineage. " + +"Remote segment epochs: {} and partition leader epochs: {}", +segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs); return false; } // Segment's first epoch's offset should be more than or equal to the respective leader epoch's offset. -if (epoch == segmentFirstEpoch && offset < leaderEpochs.get(epoch)) { -LOGGER.debug("[{}] Remote segment {}'s first epoch {}'s offset is less than leader epoch's offset {}.", -segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch)); -return false; -} +//if (epoch == segmentFirstEpoch && offset < leaderEpochs.get(epoch)) { +//LOGGER.debug("Segment {} first epoch {} offset is less than leader epoch offset {}.", +//segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch)); +//return false; +//} // Segment's end offset should be less than or equal to the respective leader epoch's offset. if (epoch == segmentLastEpoch) { Map.Entry nextEntry = leaderEpochs.higherEntry(epoch); if (nextEntry != null && segmentEndOffset > nextEntry.getValue() - 1) { -LOGGER.debug("[{}] Remote segment {}'s end offset {} is more than leader epoch's offset {}.", -segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), segmentEndOffset, nextEntry.getValue() - 1); +LOGGER.debug("Segment {} end offset {} is more than leader epoch offset {}.", +segmentMetadata.remoteLogSegmentId(), segmentEndOffset, nextEntry.getValue() - 1); return false; } } // Next segment epoch entry and next leader epoch entry should be same to ensure that the segment's epoch // is within the leader epoch lineage. if (epoch != segmentLastEpoch && !leaderEpochs.higherEntry(epoch).equals(segmentLeaderEpochs.higherEntry(epoch))) { -LOGGER.debug("[{}] Remote segment {}'s epoch {} is not within the leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", -segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs); +LOGGER.debug("Segment {} epoch {} is not within the leader epoch lineage. " + Review Comment: The logger context already hold the partition information so removed the `segmentMetadata.topicIdPartition()` placeholder from the log statement. -- This is an automated
[GitHub] [kafka] kamalcph commented on a diff in pull request #14330: KAFKA-15410: Delete records with tiered storage integration test (4/4)
kamalcph commented on code in PR #14330: URL: https://github.com/apache/kafka/pull/14330#discussion_r1314695377 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1066,50 +1066,53 @@ public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata // Check for out of bound epochs between segment epochs and current leader epochs. Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey(); Integer segmentLastEpoch = segmentLeaderEpochs.lastKey(); + +// FIXME: We have to remove the below check too for `DELETE_RECORDS` API to work properly. if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > leaderEpochs.lastKey()) { -LOGGER.debug("[{}] Remote segment {} is not within the partition leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", -segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs); +LOGGER.debug("Segment {} is not within the partition leader epoch lineage. " + +"Remote segment epochs: {} and partition leader epochs: {}", +segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs); return false; } for (Map.Entry entry : segmentLeaderEpochs.entrySet()) { int epoch = entry.getKey(); -long offset = entry.getValue(); +//long offset = entry.getValue(); // If segment's epoch does not exist in the leader epoch lineage then it is not a valid segment. if (!leaderEpochs.containsKey(epoch)) { -LOGGER.debug("[{}] Remote segment {}'s epoch {} is not within the leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", -segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs); +LOGGER.debug("Segment {} epoch {} is not within the leader epoch lineage. " + +"Remote segment epochs: {} and partition leader epochs: {}", +segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs); return false; } // Segment's first epoch's offset should be more than or equal to the respective leader epoch's offset. -if (epoch == segmentFirstEpoch && offset < leaderEpochs.get(epoch)) { -LOGGER.debug("[{}] Remote segment {}'s first epoch {}'s offset is less than leader epoch's offset {}.", -segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch)); -return false; -} +//if (epoch == segmentFirstEpoch && offset < leaderEpochs.get(epoch)) { +//LOGGER.debug("Segment {} first epoch {} offset is less than leader epoch offset {}.", +//segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch)); +//return false; +//} // Segment's end offset should be less than or equal to the respective leader epoch's offset. if (epoch == segmentLastEpoch) { Map.Entry nextEntry = leaderEpochs.higherEntry(epoch); if (nextEntry != null && segmentEndOffset > nextEntry.getValue() - 1) { -LOGGER.debug("[{}] Remote segment {}'s end offset {} is more than leader epoch's offset {}.", -segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), segmentEndOffset, nextEntry.getValue() - 1); +LOGGER.debug("Segment {} end offset {} is more than leader epoch offset {}.", +segmentMetadata.remoteLogSegmentId(), segmentEndOffset, nextEntry.getValue() - 1); return false; } } // Next segment epoch entry and next leader epoch entry should be same to ensure that the segment's epoch // is within the leader epoch lineage. if (epoch != segmentLastEpoch && !leaderEpochs.higherEntry(epoch).equals(segmentLeaderEpochs.higherEntry(epoch))) { -LOGGER.debug("[{}] Remote segment {}'s epoch {} is not within the leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", -segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs); +LOGGER.debug("Segment {} epoch {} is not within the leader epoch lineage. " + Review Comment: The logger context already hold the partition information so removed the `partition` placeholder from the logs. -- This is an automated message from the Apache Git
[GitHub] [kafka] dajac commented on pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest
dajac commented on PR #14321: URL: https://github.com/apache/kafka/pull/14321#issuecomment-1704917384 @rreddy-22 @CalvinConfluent @jolshan Thanks for your comments. I have addressed all of them. I have also changed a bit the schema and fixed a few other tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest
dajac commented on code in PR #14321: URL: https://github.com/apache/kafka/pull/14321#discussion_r1314671160 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -631,4 +631,33 @@ public void testValidateOffsetCommit() { // This should succeed. group.validateOffsetCommit("member-id", "", 0); } + +@Test +public void testValidateOffsetFetch() { +SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); +ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo"); + +// Simulate a call from the admin client without member id and member epoch. +// This should pass only if the group is empty. +group.validateOffsetFetch("", -1, Long.MAX_VALUE); + +// The member does not exist. +assertThrows(UnknownMemberIdException.class, () -> +group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE)); + +// Create a member. +snapshotRegistry.getOrCreateSnapshot(0); +group.getOrMaybeCreateMember("member-id", true); + +// The member does not exist at epoch 0. Review Comment: This comment is confusing. The epoch was referring to the epoch of the snapshot here. I rephrased it to make it clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest
dajac commented on code in PR #14321: URL: https://github.com/apache/kafka/pull/14321#discussion_r131483 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -631,4 +631,33 @@ public void testValidateOffsetCommit() { // This should succeed. group.validateOffsetCommit("member-id", "", 0); } + +@Test +public void testValidateOffsetFetch() { +SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); +ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo"); + +// Simulate a call from the admin client without member id and member epoch. +// This should pass only if the group is empty. Review Comment: That part of the comment was wrong. I just removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest
dajac commented on code in PR #14321: URL: https://github.com/apache/kafka/pull/14321#discussion_r1314665557 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -181,10 +181,28 @@ public List fetchOffsets( String groupId, List topics, long committedOffset +) { Review Comment: This or to simulate the admin client case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest
dajac commented on code in PR #14321: URL: https://github.com/apache/kafka/pull/14321#discussion_r1314664510 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -181,10 +181,28 @@ public List fetchOffsets( String groupId, List topics, long committedOffset +) { +return fetchOffsets( +groupId, +"", +-1, +topics, +committedOffset +); +} + +public List fetchOffsets( +String groupId, +String memberId, +int memberEpoch, +List topics, +long committedOffset ) { OffsetFetchResponseData.OffsetFetchResponseGroup response = offsetMetadataManager.fetchOffsets( new OffsetFetchRequestData.OffsetFetchRequestGroup() .setGroupId(groupId) +.setMemberId(memberId) Review Comment: Correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest
dajac commented on code in PR #14321: URL: https://github.com/apache/kafka/pull/14321#discussion_r1314663349 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -823,9 +823,17 @@ public void validateOffsetCommit( /** * Validates the OffsetFetch request. + * + * @param memberId The member id. This is not provided for generic groups. + * @param memberEpoch The member epoch for consumer groups. This is not provided for generic groups. + * @param lastCommittedOffset The last committed offsets in the timeline. */ @Override -public void validateOffsetFetch() throws GroupIdNotFoundException { +public void validateOffsetFetch( +String memberId, Review Comment: Correct. I mentioned this in the javadoc of the method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest
dajac commented on code in PR #14321: URL: https://github.com/apache/kafka/pull/14321#discussion_r1314661887 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1409,6 +1444,82 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { ), context.fetchAllOffsets("group", Long.MAX_VALUE)); } +@Test +public void testConsumerGroupOffsetFetchWithMemberIdAndEpoch() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); +// Create consumer group. +ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); +// Create member. +group.getOrMaybeCreateMember("member", true); +// Commit offset. +context.commitOffset("group", "foo", 0, 100L, 1); + +// Fetch offsets case. +List topics = Collections.singletonList( +new OffsetFetchRequestData.OffsetFetchRequestTopics() +.setName("foo") +.setPartitionIndexes(Collections.singletonList(0)) +); + +assertEquals(Collections.singletonList( +new OffsetFetchResponseData.OffsetFetchResponseTopics() +.setName("foo") +.setPartitions(Collections.singletonList( +mkOffsetPartitionResponse(0, 100L, 1, "metadata") +)) +), context.fetchOffsets("group", "member", 0, topics, Long.MAX_VALUE)); + +// Fetch all offsets case. +assertEquals(Collections.singletonList( +new OffsetFetchResponseData.OffsetFetchResponseTopics() +.setName("foo") +.setPartitions(Collections.singletonList( +mkOffsetPartitionResponse(0, 100L, 1, "metadata") +)) +), context.fetchAllOffsets("group", "member", 0, Long.MAX_VALUE)); +} + +@Test +public void testConsumerGroupOffsetFetchWithUnknownMemberId() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +// Fetch offsets case. +List topics = Collections.singletonList( +new OffsetFetchRequestData.OffsetFetchRequestTopics() +.setName("foo") +.setPartitionIndexes(Collections.singletonList(0)) +); + +assertThrows(UnknownMemberIdException.class, +() -> context.fetchOffsets("group", "member", 0, topics, Long.MAX_VALUE)); + +// Fetch all offsets case. +assertThrows(UnknownMemberIdException.class, +() -> context.fetchAllOffsets("group", "member", 0, Long.MAX_VALUE)); +} + +@Test +public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() { Review Comment: Sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
[ https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-6527: -- Fix Version/s: 3.7.0 (was: 3.6.0) > Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig > > > Key: KAFKA-6527 > URL: https://issues.apache.org/jira/browse/KAFKA-6527 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Labels: flakey > Fix For: 3.7.0 > > > {code:java} > java.lang.AssertionError: Log segment size increase not applied > at kafka.utils.TestUtils$.fail(TestUtils.scala:355) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865) > at > kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
[ https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761748#comment-17761748 ] Satish Duggana commented on KAFKA-6527: --- Moved to 3.7.0 as this JIRA is not yet assigned and we are near code freeze for 3.6.0. > Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig > > > Key: KAFKA-6527 > URL: https://issues.apache.org/jira/browse/KAFKA-6527 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Labels: flakey > Fix For: 3.7.0 > > > {code:java} > java.lang.AssertionError: Log segment size increase not applied > at kafka.utils.TestUtils$.fail(TestUtils.scala:355) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865) > at > kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on pull request #13908: KAFKA-15052 Fix the flaky testBalancePartitionLeaders - part II
showuon commented on PR #13908: URL: https://github.com/apache/kafka/pull/13908#issuecomment-1704888456 Backported into 3.6 branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15052) Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()
[ https://issues.apache.org/jira/browse/KAFKA-15052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-15052. --- Resolution: Fixed > Fix flaky test QuorumControllerTest.testBalancePartitionLeaders() > - > > Key: KAFKA-15052 > URL: https://issues.apache.org/jira/browse/KAFKA-15052 > Project: Kafka > Issue Type: Test >Reporter: Dimitar Dimitrov >Assignee: Dimitar Dimitrov >Priority: Major > Labels: flaky-test > Fix For: 3.6.0 > > > Test failed at > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/] > as well as in various local runs. > The test creates a topic, fences a broker, notes partition imbalance due to > another broker taking over the partition the fenced broker lost, re-registers > and unfences the fenced broker, sends {{AlterPartition}} for the lost > partition adding the now unfenced broker back to its ISR, then waits for the > partition imbalance to disappear. > The local failures seem to happen when the brokers (including the ones that > never get fenced by the test) accidentally get fenced by losing their session > due to reaching the (aggressively low for test purposes) session timeout. > The Cloudbees failure quoted above also seems to indicate that this happened: > {code:java} > ...[truncated 738209 chars]... > 23. (org.apache.kafka.controller.QuorumController:768) > [2023-06-02 18:17:22,202] DEBUG [QuorumController id=0] Scheduling write > event for maybeBalancePartitionLeaders because scheduled (DEFERRED), > checkIntervalNs (OptionalLong[10]) and isImbalanced (true) > (org.apache.kafka.controller.QuorumController:1401) > [2023-06-02 18:17:22,202] INFO [QuorumController id=0] Fencing broker 2 > because its session has timed out. > (org.apache.kafka.controller.ReplicationControlManager:1459) > [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] handleBrokerFenced: > changing partition(s): foo-0, foo-1, foo-2 > (org.apache.kafka.controller.ReplicationControlManager:1750) > [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] partition change for > foo-0 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 > -> 3, partitionEpoch: 2 -> 3 > (org.apache.kafka.controller.ReplicationControlManager:157) > [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for > foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: isr: [2, 3] -> [3], leaderEpoch: > 3 -> 4, partitionEpoch: 4 -> 5 > (org.apache.kafka.controller.ReplicationControlManager:157) > [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for > foo-2 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 > -> 3, partitionEpoch: 2 -> 3 > (org.apache.kafka.controller.ReplicationControlManager:157) > [2023-06-02 18:17:22,205] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, > appendTimestamp=240, > records=[ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, > removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at > version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=1, > topicId=033_QSX7TfitL4SDzoeR4w, isr=[3], leader=3, replicas=null, > removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at > version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=2, > topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, > removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at > version 0), ApiMessageAndVersion(BrokerRegistrationChangeRecord(brokerId=2, > brokerEpoch=3, fenced=1, inControlledShutdown=0) at version 0)]), > prevOffset=27) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:253) > [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Creating in-memory > snapshot 27 (org.apache.kafka.timeline.SnapshotRegistry:197) > [2023-06-02 18:17:22,205] DEBUG [LocalLogManager 0] Node 0: running log > check. (org.apache.kafka.metalog.LocalLogManager:512) > [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Read-write operation > maybeFenceReplicas(451616131) will be completed when the log reaches offset > 27. (org.apache.kafka.controller.QuorumController:768) > [2023-06-02 18:17:22,208] INFO [QuorumController id=0] Fencing broker 3 > because its session has timed out. > (org.apache.kafka.controller.ReplicationControlManager:1459) > [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] handleBrokerFenced: > changing partition(s): foo-1 > (org.apache.kafka.controller.ReplicationControlManager:1750) > [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] partition change for > foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 3 -> -1, leaderEpoch: 4 > ->
[jira] [Updated] (KAFKA-15052) Fix flaky test QuorumControllerTest.testBalancePartitionLeaders()
[ https://issues.apache.org/jira/browse/KAFKA-15052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-15052: -- Fix Version/s: 3.6.0 (was: 3.7.0) > Fix flaky test QuorumControllerTest.testBalancePartitionLeaders() > - > > Key: KAFKA-15052 > URL: https://issues.apache.org/jira/browse/KAFKA-15052 > Project: Kafka > Issue Type: Test >Reporter: Dimitar Dimitrov >Assignee: Dimitar Dimitrov >Priority: Major > Labels: flaky-test > Fix For: 3.6.0 > > > Test failed at > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1892/tests/] > as well as in various local runs. > The test creates a topic, fences a broker, notes partition imbalance due to > another broker taking over the partition the fenced broker lost, re-registers > and unfences the fenced broker, sends {{AlterPartition}} for the lost > partition adding the now unfenced broker back to its ISR, then waits for the > partition imbalance to disappear. > The local failures seem to happen when the brokers (including the ones that > never get fenced by the test) accidentally get fenced by losing their session > due to reaching the (aggressively low for test purposes) session timeout. > The Cloudbees failure quoted above also seems to indicate that this happened: > {code:java} > ...[truncated 738209 chars]... > 23. (org.apache.kafka.controller.QuorumController:768) > [2023-06-02 18:17:22,202] DEBUG [QuorumController id=0] Scheduling write > event for maybeBalancePartitionLeaders because scheduled (DEFERRED), > checkIntervalNs (OptionalLong[10]) and isImbalanced (true) > (org.apache.kafka.controller.QuorumController:1401) > [2023-06-02 18:17:22,202] INFO [QuorumController id=0] Fencing broker 2 > because its session has timed out. > (org.apache.kafka.controller.ReplicationControlManager:1459) > [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] handleBrokerFenced: > changing partition(s): foo-0, foo-1, foo-2 > (org.apache.kafka.controller.ReplicationControlManager:1750) > [2023-06-02 18:17:22,203] DEBUG [QuorumController id=0] partition change for > foo-0 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 > -> 3, partitionEpoch: 2 -> 3 > (org.apache.kafka.controller.ReplicationControlManager:157) > [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for > foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w: isr: [2, 3] -> [3], leaderEpoch: > 3 -> 4, partitionEpoch: 4 -> 5 > (org.apache.kafka.controller.ReplicationControlManager:157) > [2023-06-02 18:17:22,204] DEBUG [QuorumController id=0] partition change for > foo-2 with topic ID 033_QSX7TfitL4SDzoeR4w: leader: 2 -> -1, leaderEpoch: 2 > -> 3, partitionEpoch: 2 -> 3 > (org.apache.kafka.controller.ReplicationControlManager:157) > [2023-06-02 18:17:22,205] DEBUG append(batch=LocalRecordBatch(leaderEpoch=1, > appendTimestamp=240, > records=[ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, > removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at > version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=1, > topicId=033_QSX7TfitL4SDzoeR4w, isr=[3], leader=3, replicas=null, > removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at > version 0), ApiMessageAndVersion(PartitionChangeRecord(partitionId=2, > topicId=033_QSX7TfitL4SDzoeR4w, isr=null, leader=-1, replicas=null, > removingReplicas=null, addingReplicas=null, leaderRecoveryState=-1) at > version 0), ApiMessageAndVersion(BrokerRegistrationChangeRecord(brokerId=2, > brokerEpoch=3, fenced=1, inControlledShutdown=0) at version 0)]), > prevOffset=27) (org.apache.kafka.metalog.LocalLogManager$SharedLogData:253) > [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Creating in-memory > snapshot 27 (org.apache.kafka.timeline.SnapshotRegistry:197) > [2023-06-02 18:17:22,205] DEBUG [LocalLogManager 0] Node 0: running log > check. (org.apache.kafka.metalog.LocalLogManager:512) > [2023-06-02 18:17:22,205] DEBUG [QuorumController id=0] Read-write operation > maybeFenceReplicas(451616131) will be completed when the log reaches offset > 27. (org.apache.kafka.controller.QuorumController:768) > [2023-06-02 18:17:22,208] INFO [QuorumController id=0] Fencing broker 3 > because its session has timed out. > (org.apache.kafka.controller.ReplicationControlManager:1459) > [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] handleBrokerFenced: > changing partition(s): foo-1 > (org.apache.kafka.controller.ReplicationControlManager:1750) > [2023-06-02 18:17:22,209] DEBUG [QuorumController id=0] partition change for > foo-1 with topic ID 033_QSX7TfitL4SDzoeR4w:
[GitHub] [kafka] showuon merged pull request #13908: KAFKA-15052 Fix the flaky testBalancePartitionLeaders - part II
showuon merged PR #13908: URL: https://github.com/apache/kafka/pull/13908 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)
showuon commented on code in PR #14307: URL: https://github.com/apache/kafka/pull/14307#discussion_r1314623009 ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java: ## @@ -56,7 +56,7 @@ public Integer getBrokerId() { /** * Wait until the first segment offset in Apache Kafka storage for the given topic-partition is - * equal or greater to the provided offset. + * equal to the provided offset. Review Comment: Thanks for the fix. ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionTimeTest.java: ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.common.config.TopicConfig; + +import java.util.Collections; +import java.util.Map; + +public final class DeleteSegmentsByRetentionTimeTest extends DeleteSegmentsByRetentionSizeTest { Review Comment: It's surprising the retentionTimeTest extends retentionSizeTest. Could we create an abstract `BaseDeleteSegmentsTest` class, and put the test spec there. Then we can have retentionTimeTest and retentionSizeTest tests extends the `BaseDeleteSegmentsTest`. ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -463,17 +463,21 @@ class BrokerServer( new KafkaConfig(config.originals(), true) // Start RemoteLogManager before broker start serving the requests. - remoteLogManagerOpt.foreach(rlm => { + remoteLogManagerOpt.foreach { rlm => val listenerName = config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName() if (listenerName != null) { - val endpoint = endpoints.stream.filter(e => e.listenerName.equals(ListenerName.normalised(listenerName))) + val endpoint = endpoints.stream +.filter(e => + e.listenerName().isPresent && + ListenerName.normalised(e.listenerName().get()).equals(ListenerName.normalised(listenerName)) +) Review Comment: Nice catch! ## storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectEmptyRemoteStorageAction.java: ## @@ -36,8 +36,8 @@ public ExpectEmptyRemoteStorageAction(TopicPartition topicPartition) { public void doExecute(TieredStorageTestContext context) throws InterruptedException { TestUtils.waitForCondition(() -> { LocalTieredStorageSnapshot snapshot = context.takeTieredStorageSnapshot(); -return !snapshot.getTopicPartitions().contains(topicPartition) && -snapshot.getFilesets(topicPartition).isEmpty(); +// With KAFKA-15166, snapshot should not contain the topicPartition Review Comment: OK, so I think we should add all the explanation into the comment. From the current comment, it's not clear to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abhijeetk88 opened a new pull request, #14331: Add documentation for tiered storage metrics
abhijeetk88 opened a new pull request, #14331: URL: https://github.com/apache/kafka/pull/14331 Added documentation for tiered storage metrics. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15430) Kafla create replca partition on controller node
[ https://issues.apache.org/jira/browse/KAFKA-15430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrii Vysotskiy updated KAFKA-15430: - Description: I have configuration 5 nodes (KRAFT mode), with next roles: 4 broker+controller and 1 controller. Create topic with replication factor 5, and it is created, and describe show that topic partition have 5 replicas. {{/opt/kafka/latest/bin/kafka-topics.sh --create --bootstrap-server=dc1-prod-kafka-001-vs:9092 --replication-factor 5 --partitions 1 --topic test5}} /opt/kafka/latest/bin/kafka-topics.sh --describe --topic test5 --bootstrap-server=dc1-prod-kafka-001-vs:9092 Topic: test5 TopicId: amuqr8EgRmqeKryUHZwsMA PartitionCount: 1 ReplicationFactor: 5 Configs: segment.bytes=1073741824 Topic: test5 Partition: 0 Leader: 3 Replicas: 3,4,1,2,5 Isr: 3,4,1,2}} Replicas 5 and ISR 4. Why does kafka initially allow you to create a replica on the controller node, although in reality the replica is not created on the controller node and there are no topic files in the log directory. Is this expected behavior or not? Thanks. I want to understand whether such behavior is the norm for Kafka was: {*}{*}I have configuration 5 nodes, with next roles: 4 broker+controller and 1 controller. Create topic with replication factor 5, and it is created, and describe show that topic partition have 5 replicas. {{/opt/kafka/latest/bin/kafka-topics.sh --create --bootstrap-server=dc1-prod-kafka-001-vs:9092 --replication-factor 5 --partitions 1 --topic test5}} {{/opt/kafka/latest/bin/kafka-topics.sh --describe --topic test5 --bootstrap-server=dc1-prod-kafka-001-vs:9092 Topic: test5 TopicId: amuqr8EgRmqeKryUHZwsMA PartitionCount: 1 ReplicationFactor: 5 Configs: segment.bytes=1073741824 Topic: test5 Partition: 0 Leader: 3 Replicas: 3,4,1,2,5 Isr: 3,4,1,2}} {{}} Replicas 5 and ISR 4. Why does kafka initially allow you to create a replica on the controller node, although in reality the replica is not created on the controller node and there are no topic files in the log directory. Is this expected behavior or not? Thanks. I want to understand whether such behavior is the norm for Kafka {{}} > Kafla create replca partition on controller node > > > Key: KAFKA-15430 > URL: https://issues.apache.org/jira/browse/KAFKA-15430 > Project: Kafka > Issue Type: Test > Components: kraft >Affects Versions: 3.5.1 >Reporter: Andrii Vysotskiy >Priority: Minor > > I have configuration 5 nodes (KRAFT mode), with next roles: 4 > broker+controller and 1 controller. Create topic with replication factor 5, > and it is created, and describe show that topic partition have 5 replicas. > > {{/opt/kafka/latest/bin/kafka-topics.sh --create > --bootstrap-server=dc1-prod-kafka-001-vs:9092 --replication-factor 5 > --partitions 1 --topic test5}} > > /opt/kafka/latest/bin/kafka-topics.sh --describe --topic test5 > --bootstrap-server=dc1-prod-kafka-001-vs:9092 > Topic: test5 TopicId: amuqr8EgRmqeKryUHZwsMA PartitionCount: 1 > ReplicationFactor: 5 Configs: segment.bytes=1073741824 > Topic: test5 Partition: 0 Leader: 3 Replicas: 3,4,1,2,5 Isr: 3,4,1,2}} > > Replicas 5 and ISR 4. Why does kafka initially allow you to create a replica > on the controller node, although in reality the replica is not created on the > controller node and there are no topic files in the log directory. > Is this expected behavior or not? Thanks. > I want to understand whether such behavior is the norm for Kafka > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15430) Kafla create replca partition on controller node
Andrii Vysotskiy created KAFKA-15430: Summary: Kafla create replca partition on controller node Key: KAFKA-15430 URL: https://issues.apache.org/jira/browse/KAFKA-15430 Project: Kafka Issue Type: Test Components: kraft Affects Versions: 3.5.1 Reporter: Andrii Vysotskiy {*}{*}I have configuration 5 nodes, with next roles: 4 broker+controller and 1 controller. Create topic with replication factor 5, and it is created, and describe show that topic partition have 5 replicas. {{/opt/kafka/latest/bin/kafka-topics.sh --create --bootstrap-server=dc1-prod-kafka-001-vs:9092 --replication-factor 5 --partitions 1 --topic test5}} {{/opt/kafka/latest/bin/kafka-topics.sh --describe --topic test5 --bootstrap-server=dc1-prod-kafka-001-vs:9092 Topic: test5 TopicId: amuqr8EgRmqeKryUHZwsMA PartitionCount: 1 ReplicationFactor: 5 Configs: segment.bytes=1073741824 Topic: test5 Partition: 0 Leader: 3 Replicas: 3,4,1,2,5 Isr: 3,4,1,2}} {{}} Replicas 5 and ISR 4. Why does kafka initially allow you to create a replica on the controller node, although in reality the replica is not created on the controller node and there are no topic files in the log directory. Is this expected behavior or not? Thanks. I want to understand whether such behavior is the norm for Kafka {{}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] omkreddy merged pull request #14318: KAFKA-15422: Update documentation for Delegation Tokens in Kafka with KRaft
omkreddy merged PR #14318: URL: https://github.com/apache/kafka/pull/14318 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #14318: KAFKA-15422: Update documentation for Delegation Tokens in Kafka with KRaft
omkreddy commented on PR #14318: URL: https://github.com/apache/kafka/pull/14318#issuecomment-1704861029 Test failures are not related, merging this minor doc change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] elkkhan commented on a diff in pull request #14077: KAFKA-14112: Expose replication-offset-lag Mirror metric
elkkhan commented on code in PR #14077: URL: https://github.com/apache/kafka/pull/14077#discussion_r1314605208 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceMetrics.java: ## @@ -104,12 +108,25 @@ class MirrorSourceMetrics implements AutoCloseable { replicationLatencyAvg = new MetricNameTemplate( "replication-latency-ms-avg", SOURCE_CONNECTOR_GROUP, "Average time it takes records to replicate from source to target cluster.", partitionTags); +replicationOffsetLag = new MetricNameTemplate( Review Comment: @hudeqi I hear you now - we do have a disagreement about the lag definition and it was an oversight on my end. I will copy your reply over to the mailing thread and will answer there to keep the discussion in one place -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #14077: KAFKA-14112: Expose replication-offset-lag Mirror metric
hudeqi commented on code in PR #14077: URL: https://github.com/apache/kafka/pull/14077#discussion_r1314592488 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceMetrics.java: ## @@ -104,12 +108,25 @@ class MirrorSourceMetrics implements AutoCloseable { replicationLatencyAvg = new MetricNameTemplate( "replication-latency-ms-avg", SOURCE_CONNECTOR_GROUP, "Average time it takes records to replicate from source to target cluster.", partitionTags); +replicationOffsetLag = new MetricNameTemplate( Review Comment: I guess we have a disagreement about lag? My understanding of lag is: the real LEO of the source cluster partition minus the LEO that has been written to the target cluster. It seems that your definition of lag is: the lag between the mirror task getting data from consumption and writing it to the target cluster? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest
dajac commented on code in PR #14321: URL: https://github.com/apache/kafka/pull/14321#discussion_r1314590003 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -538,17 +537,46 @@ public void validateOffsetCommit( if (memberEpoch < 0 && members().isEmpty()) return; final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false); -if (memberEpoch != member.memberEpoch()) { -throw Errors.STALE_MEMBER_EPOCH.exception(); -} +validateMemberEpoch(memberEpoch, member.memberEpoch()); } /** * Validates the OffsetFetch request. + * + * @param memberId The member id for consumer groups. + * @param memberEpoch The member epoch for consumer groups. + * @param lastCommittedOffset The last committed offsets in the timeline. */ @Override -public void validateOffsetFetch() { -// Nothing. +public void validateOffsetFetch( +String memberId, +int memberEpoch, +long lastCommittedOffset +) throws UnknownMemberIdException, StaleMemberEpochException { +// When the member epoch is -1, the request comes from the admin client. In this case, +// the request can commit offsets if the group is empty. Review Comment: The comment is wrong. Let me fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #14077: KAFKA-14112: Expose replication-offset-lag Mirror metric
hudeqi commented on code in PR #14077: URL: https://github.com/apache/kafka/pull/14077#discussion_r1314583243 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ## @@ -282,6 +285,38 @@ SourceRecord convertRecord(ConsumerRecord record) { record.timestamp(), headers); } +//visible for testing +void reportReplicationOffsetLag(ConsumerRecords lastPolledRecords) { +Set partitions = lastPolledRecords.partitions(); +partitions.forEach(p -> { +try { +long replicationOffsetLag = getReplicationOffsetLagForPartition(p, lastPolledRecords.records(p)); +if (replicationOffsetLag < 0) { +log.warn("Replication offset lag for partition {} is negative({}) - " + +"skipping metric reporting for this partition.", p, replicationOffsetLag); +return; +} +metrics.replicationOffsetLag(p, replicationOffsetLag + 1); //+1 to account for zero-based offset numbering +} catch (UnsupportedOperationException e) { +log.error("Failed to calculate replication offset lag for partition {}.", p, e); +} +}); +} + +private long getReplicationOffsetLagForPartition(TopicPartition partition, + List> lastPolledRecordsForPartition) { +ConsumerRecord lastPolledRecord = + lastPolledRecordsForPartition.get(lastPolledRecordsForPartition.size() - 1); +if (!lastPolledRecord.topic().equals(partition.topic()) || lastPolledRecord.partition() != partition.partition()) { +String error = String.format( +"Unexpected topic/partition mismatch while calculating replication-offset-lag. Expected: %s, got: %s-%s.", +partition, lastPolledRecord.topic(), lastPolledRecord.partition()); +throw new UnsupportedOperationException(error); +} +long endOffsetForPartition = lastPolledRecord.offset(); Review Comment: My doubts are here: I think the LEO of the partition should be the log end offset of the partition in the source cluster, but the `lastPolledRecord.offset()` here represents only the offset in the source cluster of the last record polled by the task on the partition, that is to say, maybe the log end offset of the source cluster has reached 100, but due to the poor consumer performance of the task, it is actually only polled to the position where the offset is equal to 80, so the lag must be greater than 20 (the reason why it is greater than this is because it has just been polled data, not yet written to the target cluster, I think we agree on this). But if you follow the logic here in the PR, LEO will be 80, but in fact the offset of the source cluster has been written to 100. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #14307: KAFKA-15410: Expand partitions, segment deletion by retention and enable remote log on topic integration tests (1/4)
clolov commented on PR #14307: URL: https://github.com/apache/kafka/pull/14307#issuecomment-1704805274 I will aim to provide a review by the end of the day! Thanks for the effort -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15399) Enable OffloadAndConsumeFromLeader test
[ https://issues.apache.org/jira/browse/KAFKA-15399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761727#comment-17761727 ] Christo Lolov commented on KAFKA-15399: --- Heya [~pnee], [~lianetm] and [~showuon]! I had a look at the last 5 builds on trunk since [https://github.com/apache/kafka/pull/14319] was merged in and none of them contain the test failure detailed in https://issues.apache.org/jira/browse/KAFKA-15427: * [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2164/#showFailuresLink] * [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2165/] * [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2166/#showFailuresLink] * [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2167/] * [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2168/] I believe the issue is resolved so I will keep this Jira ticket as resolved as well, but if you manage to find an occurrence I will gladly reopen it! > Enable OffloadAndConsumeFromLeader test > --- > > Key: KAFKA-15399 > URL: https://issues.apache.org/jira/browse/KAFKA-15399 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.6.0 >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Blocker > Fix For: 3.6.0 > > > Build / JDK 17 and Scala 2.13 / initializationError – > org.apache.kafka.tiered.storage.integration.OffloadAndConsumeFromLeaderTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ocadaruma commented on pull request #14242: KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance
ocadaruma commented on PR #14242: URL: https://github.com/apache/kafka/pull/14242#issuecomment-1704786562 @showuon Thank you for your review. 0) Got it. I revised the PR description to include analysis. 2) The actual change is made against `SnapshotFile.java#renameTo`, which is called from removeAndMarkSnapshotForDeletion. 3) > We can always recover from logs when unclean shutdown. Yes. However, precisely, removing fsync on `LeaderEpochFileCache`'s truncation doesn't cause extra recovery even on unclean shutdown IMO. The reason: - Since we still fsync on `LeaderEpochFileCache#assign`, we can still ensure all necessary leader epochs are in leader-epoch cache file - Even when truncation is not flushed (so "should-be-truncated" epochs may be left on the epoch file on unclean shutdown), log-loading procedure should truncate the epoch file as necessary (based on the log start/end offset). It's a fairly right-weight operation comparing to the recovering from the log. - Hmm, I intentionally didn't create a overloaded method because I was afraid a bit that default (fsync: true) method is used casually in the future code change even for the place which fsync isn't necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #14242: KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance
showuon commented on PR #14242: URL: https://github.com/apache/kafka/pull/14242#issuecomment-1704740274 @ocadaruma , thanks for the improvement! Some high level questions: 0. Although you've added comments in the JIRA, it'd better you add your analysis and what/why you've changed in the PR description. 1. moving fsync call to the scheduler thread in `takeSnapshot` case makes sense to me, since we have every info in memory cache. And the log recovery can recover the snapshot when unclean shutdown. 2. For `removeAndMarkSnapshotForDeletion`, I didn't see this fix, could you explain it? 3. For `LeaderEpochFileCache#truncateXXX`, I agree that as long as the memory cache is up-to-date, it should be fine. We can always recover from logs when unclean shutdown. 4. nit: In the PR, could we make less code change for easier review? That is, we can create a overloaded method, and take one more parameter (boolean sync), and delegate the original method implementation to the new method (default to true). So, the only place we need to change, is the places we want to `false` the sync flush, which will make the PR much clear IMO. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #14217: KAFKA-14595 ReassignPartitionsCommandArgsTest rewritten in java
nizhikov commented on PR #14217: URL: https://github.com/apache/kafka/pull/14217#issuecomment-1704725921 Hello @gharris1727 Thanks for the review. Are you ready to merge this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)
satishd commented on code in PR #14329: URL: https://github.com/apache/kafka/pull/14329#discussion_r1314498306 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java: ## @@ -98,35 +98,28 @@ public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) throws T */ public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata, long timeoutMs) throws TimeoutException { -final int partition = recordMetadata.partition(); -final long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs); - -log.info("Waiting until consumer is caught up with the target partition: [{}]", partition); - +int partition = recordMetadata.partition(); // If the current assignment does not have the subscription for this partition then return immediately. if (!consumerTask.isMetadataPartitionAssigned(partition)) { -throw new KafkaException("This consumer is not assigned to the target partition " + partition + ". " + -"Partitions currently assigned: " + consumerTask.metadataPartitionsAssigned()); +throw new KafkaException("This consumer is not assigned to the target partition " + partition + +". Currently assigned partitions: " + consumerTask.metadataPartitionsAssigned()); } - -final long offset = recordMetadata.offset(); +long offset = recordMetadata.offset(); long startTimeMs = time.milliseconds(); +long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs); +log.info("Wait until the consumer is caught up with the target partition {} up-to offset {}", partition, offset); while (true) { -log.debug("Checking if partition [{}] is up to date with offset [{}]", partition, offset); long readOffset = consumerTask.readOffsetForMetadataPartition(partition).orElse(-1L); if (readOffset >= offset) { return; } - -log.debug("Expected offset [{}] for partition [{}], but the committed offset: [{}], Sleeping for [{}] to retry again", -offset, partition, readOffset, consumeCheckIntervalMs); - +log.debug("Expected offset for partition {} is {}, but the read offset is {}. " + +"Sleeping for {} ms to retry again", partition, offset, readOffset, consumeCheckIntervalMs); if (time.milliseconds() - startTimeMs > timeoutMs) { -log.warn("Expected offset for partition:[{}] is : [{}], but the committed offset: [{}] ", -partition, readOffset, offset); +log.warn("Expected offset for partition {} is {}, but the read offset is {}", +partition, offset, readOffset); Review Comment: Good catch on the ordering of the offsets in the log. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org