Re: [PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]
iit2009060 commented on PR #15444: URL: https://github.com/apache/kafka/pull/15444#issuecomment-1990842795 @showuon I have added the KIP as requested for the above use case. Can you please review it. https://cwiki.apache.org/confluence/display/KAFKA/KIP-1026%3A+Handling+producer+snapshot+when+upgrading+from+%3C+v2.8.0+for+Tiered+Storage -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16318 : add javafoc for kafka metric [kafka]
showuon commented on PR #15483: URL: https://github.com/apache/kafka/pull/15483#issuecomment-1990842126 @mimaison , please take a look when available. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16318 : add javafoc for kafka metric [kafka]
showuon commented on code in PR #15483: URL: https://github.com/apache/kafka/pull/15483#discussion_r1520919788 ## clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java: ## @@ -20,6 +20,37 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.utils.Time; +/** + * An implementation of {@link Metric} interface. + * + * A KafkaMetric is a named metric for monitoring purpose. The metric value can be a {@link Measurable} or a {@link Gauge}. + * + * metricName The name of the metric + * lock A lock used for reading the metric value in case of race condition + * time The POSIX time in milliseconds the metric is being taken + * metricValueProvider The metric collecting implementation that implements {@link MetricValueProvider} + * config The metric configuration which is a {@link MetricConfig} + * Review Comment: This is already described in constructor, so I think we don't need it. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() with key deletion in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825529#comment-17825529 ] Jinyong Choi commented on KAFKA-15302: -- [https://github.com/apache/kafka/pull/15495] We have updated the document for users who use it similarly. > Stale value returned when using store.all() with key deletion in punctuation > function. > -- > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue entry = kvList.next(); > final Record msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting instance +1 > forwardAll Start > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 > # log file > ... > 01:05:00.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 > 01:05:00.388 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():7873 entries:7873 > 01:05:00.434 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > stream-task [0_0] Flushed cache or buffer Counts > ... > 01:05:00.587 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator > all() > 01:05:00.588 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all > 01:05:00.590 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.ThreadCache -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > MemoryLRUCacheBytesIterator cache all() > 01:05:00.591 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- NamedCache allKeys() > size():325771 > 01:05:00.637 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.Nam
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520862105 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -69,6 +97,15 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { createOldMessageFormatBrokers() produceMessagesInOneBatch() verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") Review Comment: This is to test the logic in `RecordsInfo#info`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520861572 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -60,6 +63,31 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { produceMessagesInOneBatch("gzip") verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this one batch test, it'll be the first offset 0 +verifyListOffsets(topic = topicNameWithCustomConfigs, 0) Review Comment: Added test cases for `LogAppendTime` scenarios. This is test the logic in `validateMessagesAndAssignOffsetsCompressed`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520858564 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +53,24 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before() { cluster.config().serverProperties().put("auto.create.topics.enable", false); cluster.config().serverProperties().put("offsets.topic.replication.factor", "1"); cluster.config().serverProperties().put("offsets.topic.num.partitions", String.valueOf(offsetTopicPartitionCount)); +} +public void setUp() { Review Comment: The `setUp` is necessary because when in `before` (i.e. `beforeEach`), the kafka cluster is still not created yet. That's why we can inject custom broker properties there. And in `setUp`, we can create producer/admin to talk to the brokers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520857036 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -263,13 +262,8 @@ public RecordsInfo info() { } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping +return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); Review Comment: > This has the same issue. The semantic for MAX_TIMESTAMP is the first offset with the max timestamp. So, if timestamp is TimestampType.LOG_APPEND_TIME, we need to use the baseOffset, instead of lastOffset. Yes, updated. > Also, could we remove the shallow part in RecordsInfo.shallowOffsetOfMaxTimestamp? I've added a comment in another PR: https://github.com/apache/kafka/pull/15476#issuecomment-1990459827 . We can do all the renaming there. ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -263,13 +262,8 @@ public RecordsInfo info() { } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping +return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); Review Comment: > This has the same issue. The semantic for MAX_TIMESTAMP is the first offset with the max timestamp. So, if timestamp is TimestampType.LOG_APPEND_TIME, we need to use the baseOffset, instead of lastOffset. Yes, updated. > Also, could we remove the shallow part in RecordsInfo.shallowOffsetOfMaxTimestamp? I've added a comment in another PR: https://github.com/apache/kafka/pull/15476#issuecomment-1990459827 . We can do all the renaming there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]
showuon commented on PR #15476: URL: https://github.com/apache/kafka/pull/15476#issuecomment-1990459827 Please also address this comment in this PR: https://github.com/apache/kafka/pull/15474#discussion_r1520262997 . Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on PR #15507: URL: https://github.com/apache/kafka/pull/15507#issuecomment-1990325002 @chia7712 @cadonna Hi, saw you approved some recent PR's, wondering if you could take a look at mine or suggest someone better suited that I can ask? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520774849 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -931,17 +920,14 @@ public void testGroupEpochBumpWhenNewStaticMemberJoins() { ConsumerGroupMember expectedMember3 = new ConsumerGroupMember.Builder(memberId3) .setMemberEpoch(11) +.setState(MemberState.UNRELEASED_PARTITIONS) .setInstanceId(memberId3) .setPreviousMemberEpoch(0) -.setTargetMemberEpoch(11) .setClientId("client") .setClientHost("localhost/127.0.0.1") .setRebalanceTimeoutMs(5000) .setSubscribedTopicNames(Arrays.asList("foo", "bar")) .setServerAssignorName("range") -.setPartitionsPendingAssignment(mkAssignment( Review Comment: do we have any way to track the partitions still pending? Or is it just a matter of the difference between the owned partition and the assignment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective
[ https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825496#comment-17825496 ] Matthias J. Sax commented on KAFKA-16359: - Just to clarify: we cannot re-publish a 3.7.0 artifact. – We can only fix this with 3.7.1 release. Seems we should push out 3.7.1 rather sooner than later. > kafka-clients-3.7.0.jar published to Maven Central is defective > --- > > Key: KAFKA-16359 > URL: https://issues.apache.org/jira/browse/KAFKA-16359 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Jeremy Norris >Assignee: Apoorv Mittal >Priority: Critical > > The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is > defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} > element: > {code} > Manifest-Version: 1.0 > Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10 > .5.jar slf4j-api-1.7.36.jar > {code} > This bogus {{Class-Path}} element leads to compiler warnings for projects > that utilize it as a dependency: > {code} > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar": > no such file or directory > {code} > Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published > without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or > a new release should be published that corrects this defect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520772129 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -338,7 +338,6 @@ public void testConsumerGroupMemberEpochValidation() { ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId) .setMemberEpoch(100) .setPreviousMemberEpoch(99) -.setTargetMemberEpoch(100) Review Comment: should we explicitly set state here? I think it is stable by default but the other builders added it. Ditto for a few more in this file -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520772129 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -338,7 +338,6 @@ public void testConsumerGroupMemberEpochValidation() { ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId) .setMemberEpoch(100) .setPreviousMemberEpoch(99) -.setTargetMemberEpoch(100) Review Comment: should we explicitly set state here? I think it is stable by default but the other builders added it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16357) Kafka Client JAR manifest breaks javac linting
[ https://issues.apache.org/jira/browse/KAFKA-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-16357. - Resolution: Duplicate > Kafka Client JAR manifest breaks javac linting > -- > > Key: KAFKA-16357 > URL: https://issues.apache.org/jira/browse/KAFKA-16357 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 > Environment: Linux, JDK 21 (Docker image eclipse-temurin:21-jdk-jammy) >Reporter: Jacek Wojciechowski >Priority: Critical > > I upgraded kafka-clients from 3.6.1 to 3.7.0 and discovered that my project > is not building anymore. > The reason is that kafka-clients-3.7.0.jar contains the following entry in > its JAR manifest file: > Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10 > .5.jar slf4j-api-1.7.36.jar > I'm using Maven repo to keep my dependencies and those files are not in the > same directory as kafka-clients-3.7.0.jar, so the paths in the manifest's > Class-Path are not correct. It fails my build because we build with javac > with all linting options on, in particular -Xlint:-path. It produces the > following warnings coming from javac: > [WARNING] COMPILATION WARNING : > [INFO] - > [WARNING] [path] bad path element > "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar": > no such file or directory > [WARNING] [path] bad path element > "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": > no such file or directory > [WARNING] [path] bad path element > "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar": > no such file or directory > [WARNING] [path] bad path element > "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar": > no such file or directory > Since we have also {{-Werror}} option enabled, it turns warnings into errors > and fails our build. > I think our setup is quite typical: using Maven repo to store dependencies, > having linting on and -Werror. Unfortunatelly, it doesn't work with the > lastest kafka-clients because of the entries in the manifest's Class-Path. > And I think it might affect quite a lot of projects set up in a similar way. > I don't know what was the reason to add Class-Path entry in the JAR manifest > file - but perhaps this effect was not considered. > It would be great if you removed the Class-Path entry from the JAR manifest > file. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16357) Kafka Client JAR manifest breaks javac linting
[ https://issues.apache.org/jira/browse/KAFKA-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16357: Priority: Critical (was: Major) > Kafka Client JAR manifest breaks javac linting > -- > > Key: KAFKA-16357 > URL: https://issues.apache.org/jira/browse/KAFKA-16357 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 > Environment: Linux, JDK 21 (Docker image eclipse-temurin:21-jdk-jammy) >Reporter: Jacek Wojciechowski >Priority: Critical > > I upgraded kafka-clients from 3.6.1 to 3.7.0 and discovered that my project > is not building anymore. > The reason is that kafka-clients-3.7.0.jar contains the following entry in > its JAR manifest file: > Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10 > .5.jar slf4j-api-1.7.36.jar > I'm using Maven repo to keep my dependencies and those files are not in the > same directory as kafka-clients-3.7.0.jar, so the paths in the manifest's > Class-Path are not correct. It fails my build because we build with javac > with all linting options on, in particular -Xlint:-path. It produces the > following warnings coming from javac: > [WARNING] COMPILATION WARNING : > [INFO] - > [WARNING] [path] bad path element > "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar": > no such file or directory > [WARNING] [path] bad path element > "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": > no such file or directory > [WARNING] [path] bad path element > "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar": > no such file or directory > [WARNING] [path] bad path element > "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar": > no such file or directory > Since we have also {{-Werror}} option enabled, it turns warnings into errors > and fails our build. > I think our setup is quite typical: using Maven repo to store dependencies, > having linting on and -Werror. Unfortunatelly, it doesn't work with the > lastest kafka-clients because of the entries in the manifest's Class-Path. > And I think it might affect quite a lot of projects set up in a similar way. > I don't know what was the reason to add Class-Path entry in the JAR manifest > file - but perhaps this effect was not considered. > It would be great if you removed the Class-Path entry from the JAR manifest > file. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: simplify consumer logic [kafka]
chia7712 commented on code in PR #15519: URL: https://github.com/apache/kafka/pull/15519#discussion_r1520683129 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -529,10 +529,9 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member +// InstanceId - send when leaving the group as a static member membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { +if (membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { data.setInstanceId(groupInstanceId); sentFields.instanceId = groupInstanceId; Review Comment: It is never read after this PR, so I feel it is safe to remove it. Otherwise, that will result in another warning about "never read". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16360) Release plan of 3.x kafka releases.
[ https://issues.apache.org/jira/browse/KAFKA-16360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-16360. - Resolution: Invalid Please don't use Jira to ask questions. Jira tickets are for bug reports and features only. Question should be asked on the user and/or dev mailing lists: https://kafka.apache.org/contact > Release plan of 3.x kafka releases. > --- > > Key: KAFKA-16360 > URL: https://issues.apache.org/jira/browse/KAFKA-16360 > Project: Kafka > Issue Type: Improvement >Reporter: kaushik srinivas >Priority: Major > > KIP > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-833%3A+Mark+KRaft+as+Production+Ready#KIP833:MarkKRaftasProductionReady-ReleaseTimeline] > mentions , > h2. Kafka 3.7 > * January 2024 > * Final release with ZK mode > But we see in Jira, some tickets are marked for 3.8 release. Does apache > continue to make 3.x releases having zookeeper and kraft supported > independent of pure kraft 4.x releases ? > If yes, how many more releases can be expected on 3.x release line ? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
mjsax commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1520671475 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.test.MockApiProcessor; +import org.apache.kafka.test.MockApiProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; + +public class KStreamKStreamWindowCloseTest { + +private static final String LEFT = "left"; +private static final String RIGHT = "right"; +private final static Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private static final Consumed CONSUMED = Consumed.with(Serdes.Integer(), Serdes.String()); +private static final JoinWindows WINDOW = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)); + +static List streams() { +return Arrays.asList( +innerJoin(), +leftJoin(), +outerJoin() +); +} + +private static Arguments innerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).join( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments leftJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).leftJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments outerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).outerJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream.process(supplier); +return Arguments.of(builder.build(PROPS), supplier); +} + +@ParameterizedTest +@MethodSource("streams") +public void recordsArrivingPostWindowCloseShouldBeDropped( +final Topology topology, +final MockApiProcessorSupplier supplier) { +
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
mjsax commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1520627804 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java: ## @@ -25,36 +25,46 @@ public final class StreamStreamJoinUtil { -private StreamStreamJoinUtil(){ +private StreamStreamJoinUtil() { } public static boolean skipRecord( final Record record, final Logger logger, final Sensor droppedRecordsSensor, -final ProcessorContext context) { +final ProcessorContext context +) { // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record // // we also ignore the record if value is null, because in a key-value data model a null-value indicates // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored if (record.key() == null || record.value() == null) { -if (context.recordMetadata().isPresent()) { -final RecordMetadata recordMetadata = context.recordMetadata().get(); -logger.warn( -"Skipping record due to null key or value. " -+ "topic=[{}] partition=[{}] offset=[{}]", -recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() -); -} else { -logger.warn( -"Skipping record due to null key or value. Topic, partition, and offset not known." -); -} -droppedRecordsSensor.record(); +logSkip("null key or value", logger, droppedRecordsSensor, context); return true; } else { return false; } } + +public static void logSkip( +final String reason, +final Logger logger, +final Sensor droppedRecordsSensor, +final ProcessorContext context +) { +if (context.recordMetadata().isPresent()) { +final RecordMetadata recordMetadata = context.recordMetadata().get(); +logger.warn( +"Skipping record. reason=[{}] topic=[{}] partition=[{}] offset=[{}]", Review Comment: ```suggestion "Skipping record. Reason=[{}] topic=[{}] partition=[{}] offset=[{}]", ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
mjsax commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1520626385 ## streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java: ## @@ -71,11 +71,21 @@ void afterEach() { @Test void testRelaxedLeftStreamStreamJoin() { leftStream -.leftJoin(rightStream, JOINER, WINDOW) +.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS) .to(OUT); initTopology(); -left.pipeInput(null, "leftValue", 1); -assertEquals(Collections.singletonList(new KeyValue<>(null, "leftValue|null")), out.readKeyValuesToList()); +left.pipeInput(null, "leftValue1", 1); +left.pipeInput(null, "leftValue2", 90); +left.pipeInput(null, "lateArrival-Dropped", 19); +left.pipeInput(null, "lateArrivalWithinGrace", 20); +assertEquals( +Arrays.asList( +new KeyValue<>(null, "leftValue1|null"), +new KeyValue<>(null, "leftValue2|null"), +new KeyValue<>(null, "lateArrivalWithinGrace|null") Review Comment: Oh. This is `null`-key case. My bad. For `null`-key we know it won't join in the future, so no reason to artificially delay the output. Thanks for pointing it out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove deprecation and exception throw in ProcessorRecordContext#hashcode [kafka]
mjsax commented on PR #15508: URL: https://github.com/apache/kafka/pull/15508#issuecomment-1989699015 What Bruno says. Git history if you friend :) https://github.com/apache/kafka/pull/6602/files#r278810522 I think we should keep the code as-is and close this PR w/o merging (or add some more detailed comment to explain why the code is this way better). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
mjsax commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1520612406 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: Ups... Wondering why we did not catch this in the unit tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16366) Refactor KTable source optimization
Matthias J. Sax created KAFKA-16366: --- Summary: Refactor KTable source optimization Key: KAFKA-16366 URL: https://issues.apache.org/jira/browse/KAFKA-16366 Project: Kafka Issue Type: Improvement Components: streams Reporter: Matthias J. Sax Kafka Streams DSL offers an optimization to re-use an input topic as table changelog, in favor of creating a dedicated changelog topic. So far, the Processor API did not support any such feature, and thus when the DSL compiles down into a Topology, we needed to access topology internal stuff to allow for this optimization. With KIP-813 (merged for AK 3.8), we added `Topology#addReadOnlyStateStore` as public API, and thus we should refactor the DSL compilation code, to use this public API to build the `Topology` instead of internal APIs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: simplify consumer logic [kafka]
mjsax commented on code in PR #15519: URL: https://github.com/apache/kafka/pull/15519#discussion_r1520582633 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -529,10 +529,9 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member +// InstanceId - send when leaving the group as a static member membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { +if (membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { data.setInstanceId(groupInstanceId); sentFields.instanceId = groupInstanceId; Review Comment: I don't understand enough about this part of the code to judge... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520575467 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -36,40 +37,79 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) createTopic(topicName, 1, 1.toShort) -produceMessages() adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { Review Comment: > We can give the test a more descriptive name though, right? sure. sorry for the lazy naming :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: simplify consumer logic [kafka]
chia7712 commented on code in PR #15519: URL: https://github.com/apache/kafka/pull/15519#discussion_r1520572340 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -529,10 +529,9 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member +// InstanceId - send when leaving the group as a static member membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { +if (membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { data.setInstanceId(groupInstanceId); sentFields.instanceId = groupInstanceId; Review Comment: it seems `sentFields.instanceId` is useless after this PR. maybe we should remove it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
jolshan commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520542402 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -36,40 +37,79 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) createTopic(topicName, 1, 1.toShort) -produceMessages() adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { Review Comment: We can give the test a more descriptive name though, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16365) AssignmentsManager mismanages completion notifications
Igor Soarez created KAFKA-16365: --- Summary: AssignmentsManager mismanages completion notifications Key: KAFKA-16365 URL: https://issues.apache.org/jira/browse/KAFKA-16365 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez Assignee: Igor Soarez When moving replicas between directories in the same broker, future replica promotion hinges on acknowledgment from the controller of a change in the directory assignment. ReplicaAlterLogDirsThread relies on AssignmentsManager for a completion notification of the directory assignment change. In its current form, under certain assignment scheduling, AssignmentsManager both miss completion notifications, or prematurely trigger them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]
chia7712 commented on code in PR #15476: URL: https://github.com/apache/kafka/pull/15476#discussion_r1520520996 ## core/src/test/scala/integration/kafka/api/OffsetOfMaxTimestampTest.scala: ## @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.api + +import kafka.api.IntegrationTestHarness +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{Admin, NewTopic, OffsetSpec} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util.{Collections, Properties} + +class OffsetOfMaxTimestampTest extends IntegrationTestHarness { + @AfterEach + override def tearDown(): Unit = { +TestUtils.shutdownServers(brokers) +super.tearDown() + } + + override def brokerCount: Int = 1 + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testWithNoCompression(quorum: String): Unit = { +test(false) + } + + private def test(useCompression: Boolean): Unit = { +val topicName: String = "OffsetOfMaxTimestampTest-" + System.currentTimeMillis() + +val admin: Admin = Admin.create(adminClientConfig) +try { + admin.createTopics(Collections.singletonList(new NewTopic(topicName, 1, 1.toShort) +.configs(Collections.singletonMap(TopicConfig.COMPRESSION_TYPE_CONFIG, if (useCompression) "gzip" else "none" + val props: Properties = new Properties + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, if (useCompression) "gzip" else "none") + val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props) + try { +val time: Long = 1 +producer.send(new ProducerRecord[String, String](topicName, 0, time + 100, null, "val20")) +producer.send(new ProducerRecord[String, String](topicName, 0, time + 400, null, "val15")) +producer.send(new ProducerRecord[String, String](topicName, 0, time + 250, null, "val15")) Review Comment: @jolshan I prefer to have following test: https://github.com/apache/kafka/pull/15474#discussion_r1520206161 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
junrao commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520506664 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +53,24 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before() { cluster.config().serverProperties().put("auto.create.topics.enable", false); cluster.config().serverProperties().put("offsets.topic.replication.factor", "1"); cluster.config().serverProperties().put("offsets.topic.num.partitions", String.valueOf(offsetTopicPartitionCount)); +} +public void setUp() { Review Comment: Why do we need to split the logic between `before` and `setUp`? ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -82,19 +122,63 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { ).asJava, new ListOffsetsOptions()).all().get().get(tp) } - def produceMessages(): Unit = { + def produceMessagesInOneBatch(compressionType: String = "none"): Unit = { Review Comment: Could this method be private? Ditto for `produceMessagesInSeparateBatch`. ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -263,13 +262,8 @@ public RecordsInfo info() { } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping +return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); Review Comment: This has the same issue. The semantic for MAX_TIMESTAMP is the first offset with the max timestamp. So, if timestamp is TimestampType.LOG_APPEND_TIME, we need to use the baseOffset, instead of lastOffset. Also, could we remove the shallow part in RecordsInfo.shallowOffsetOfMaxTimestamp? ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -82,19 +122,63 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { ).asJava, new ListOffsetsOptions()).all().get().get(tp) } - def produceMessages(): Unit = { + def produceMessagesInOneBatch(compressionType: String = "none"): Unit = { val records = Seq( new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L, -null, new Array[Byte](1)), +null, new Array[Byte](10)), new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L, -null, new Array[Byte](1)), +null, new Array[Byte](10)), new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L, -null, new Array[Byte](1)), +null, new Array[Byte](10)), ) -TestUtils.produceMessages(brokers, records, -1) +// create a producer with large linger.ms and enough batch.size (default is enough for three 10 bytes records), +// so that we can confirm all records will be accumulated in producer until we flush them into one batch. +val producer = createProducer( + plaintextBootstrapServers(brokers), + deliveryTimeoutMs = Int.MaxValue, + lingerMs = Int.MaxValue, + compressionType = compressionType) + +try { + val futures = records.map(producer.send) + producer.flush() + futures.foreach(_.get) +} finally { + producer.close() +} } - def generateConfigs: Seq[KafkaConfig] = -TestUtils.createBrokerConfigs(1, zkConnectOrNull).map(KafkaConfig.fromProps) + def produceMessagesInSeparateBatch(compressionType: String = "none"): Unit = { +val records = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L, +null, new Array[Byte](10))) +val records2 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L, + null, new Array[Byte](10))) +val records3 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L, + null, new Array[Byte](10))) + +val producer = createProducer( + plai
Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]
hgeraldino commented on PR #15506: URL: https://github.com/apache/kafka/pull/15506#issuecomment-1989568163 Thanks @chia7712 for your review @gharris1727 does this looks good to you? It's the final PR to consider the `WorkerSinkTaskTest` migration done done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (1/3) [kafka]
hgeraldino opened a new pull request, #15520: URL: https://github.com/apache/kafka/pull/15520 This is the last remaining Kafka Connect test that needs migration from PowerMock/EasyMock to Mockito. Following the same approach as the `WorkerSinkTaskTest` migration, which was migrated in [1](https://github.com/apache/kafka/pull/14663) [2](https://github.com/apache/kafka/pull/15313) [3](https://github.com/apache/kafka/pull/15316) separate batches, this PR contains just ~ 1/3 of the total number of test methods, which should make the review "easier". As usual, I Iook forward for your comments and feedback @C0urante @gharris1727 @divijvaidya @clolov @mukkachaitanya ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1520495884 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; +TopicDescription nextTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { +request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() +.setTopicName(partiallyFinishedTopicDescription.name()) + .setPartitionIndex(partiallyFinishedTopicDescription.partitions().size()) +); +} +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); + +if (error != Errors.NONE) { +future.completeExceptionally(error.exception()); +pendingTopics.remove(topicName); +if (responseCursor != null && responseCursor.topicName().equals(topicName)) { +responseCursor = null; +} +continue; +} + +TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + +if (partiallyFinishedTopicDescription != null && partiallyFinishedTopicDescription.name().equals(topicName)) { + partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions()); +continue; +} + +
[jira] [Commented] (KAFKA-16291) Mirrormaker2 wrong checkpoints
[ https://issues.apache.org/jira/browse/KAFKA-16291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825467#comment-17825467 ] Greg Harris commented on KAFKA-16291: - [~claudio.benfatto] That's a good idea, I agree that the default behavior isn't good enough in all scenarios and a configuration is needed. Since it includes a user configuration, and needs significant design work, this will need a KIP. I've opened KAFKA-16364 to track the work there, and you're welcome to assign yourself and draft a KIP. But just to temper your expectations here: > Offset translation guarantees zero-redelivery This is not possible given the asynchronous pattern used for offset translation. I think this can be true in an eventual-consistency sense: If the upstream consumer group is inactive for sufficiently long enough (and lag < N), then translation could be exact. We can also use this as an opportunity to design an alternative to offset.lag.max=0 and the mirror source sync send semaphore[!] because even with a 100% retention solution on the MirrorCheckpointTask side, the MirrorSourceTask still drops syncs occasionally. > Mirrormaker2 wrong checkpoints > -- > > Key: KAFKA-16291 > URL: https://issues.apache.org/jira/browse/KAFKA-16291 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.6.1 > Environment: Mirrormaker2 version 3.6.1 running on docker containers >Reporter: Claudio Benfatto >Priority: Major > > I am running Mirrormaker2 with the following configuration: > {noformat} > clusters = fallingwaterfall, weatheredbase > sync.group.offsets.interval.seconds=30 > emit.checkpoints.interval.seconds=30 > offset.lag.max=0 > fallingwaterfall->weatheredbase.enabled = true > weatheredbase->fallingwaterfall.enabled = false > sync.group.offsets.enabled=true > emit.heartbeats.enabled=true > emit.checkpoints.enabled=true > emit.checkpoints.interval.seconds=30 > refresh.groups.enabled=true > refresh.groups.interval.seconds=30 > refresh.topics.enabled=true > sync.topic.configs.enabled=true > refresh.topics.interval.seconds=30 > sync.topic.acls.enabled = false > fallingwaterfall->weatheredbase.topics = storage-demo-.* > fallingwaterfall->weatheredbase.groups = storage-demo-.* > group.id=mirror-maker-fallingwaterfall-weatheredbase > consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase > fallingwaterfall.consumer.isolation.level = read_committed > weatheredbase.producer.enable.idempotence = true > weatheredbase.producer.acks=all > weatheredbase.exactly.once.source.support = enabled > replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy > {noformat} > I am experiencing issues with the consumer group offset synchronisation. > I have a setup with a 12-partition topic, named *storage-demo-test,* a single > transactional producer to this topic and a consumer group, named > *storage-demo-test-cg,* consuming from it. > The consumer configuration is: > {code:java} > 'auto.offset.reset': 'earliest', > 'isolation.level': 'read_committed', > 'enable.auto.commit': False, {code} > and I'm committing the offsets explicitly and synchronously after each poll. > What I observed is that the synchronised offsets between the upstream and > downstream cluster for the *storage-demo-test-cg* are often wrong. > For example in the case of this checkpoint: > {code:java} > (1, 1708505669764) - 6252 - > CheckpointKey(consumer_group='storage-demo-test-cg', > topic='storage-demo-test', partition=5) - > CheckpointValue(upstream_offset=197532, downstream_offset=196300) {code} > We have a mismatch in the replicated messages: > {code:java} > [fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1 > Test message 1027-0 {code} > {code:java} > [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1 > Test message 1015-9 {code} > In the Mirrormaker2 logs I see many of these messages: > {code:java} > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 09:02:18,534] TRACE [MirrorCheckpointConnector|task-0] > latestDownstreamOffset 196300 is larger than or equal to > convertedUpstreamOffset 196300 for TopicPartition storage-demo-test-5 > (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] > translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): > Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, > upstreamOffset=196913, downstreamOffset=195683}) > (org.apache.kafka.connect.mirror.OffsetSyncStore:160) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 09:02:01,557] TRACE [MirrorCheckpointCo
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1520498085 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; +TopicDescription nextTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { Review Comment: Because we provide the topic list and the list only contains the topic we have not received the result, we can skip setting the partition 0 topics in the cursor. Such topics will be the first one in the Topics field. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1520495884 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; +TopicDescription nextTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { +request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() +.setTopicName(partiallyFinishedTopicDescription.name()) + .setPartitionIndex(partiallyFinishedTopicDescription.partitions().size()) +); +} +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); + +if (error != Errors.NONE) { +future.completeExceptionally(error.exception()); +pendingTopics.remove(topicName); +if (responseCursor != null && responseCursor.topicName().equals(topicName)) { +responseCursor = null; +} +continue; +} + +TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + +if (partiallyFinishedTopicDescription != null && partiallyFinishedTopicDescription.name().equals(topicName)) { + partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions()); +continue; +} + +
[jira] [Created] (KAFKA-16364) MM2 High-Resolution Offset Translation
Greg Harris created KAFKA-16364: --- Summary: MM2 High-Resolution Offset Translation Key: KAFKA-16364 URL: https://issues.apache.org/jira/browse/KAFKA-16364 Project: Kafka Issue Type: New Feature Components: mirrormaker Reporter: Greg Harris The current OffsetSyncStore implementation [https://github.com/apache/kafka/blob/8b72a2c72f09838fdd2e7416c98d30fe876b4078/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L57] stores a sparse index of offset syncs. This attempts to strike a balanced default behavior between offset translation availability, memory usage, and throughput on the offset syncs topic. However, this balanced default behavior is not good enough in all circumstances. When precise offset translation is needed away from the end of the topic, such as for consumer groups with persistent lag, offset translation can be more precise. Users should have a way to configure high-precision offset translation, either through additional memory usage or re-reading the offset syncs topic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16152: Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart [kafka]
mjsax commented on code in PR #15419: URL: https://github.com/apache/kafka/pull/15419#discussion_r1520482326 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -546,8 +546,9 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { data.setMemberEpoch(membershipManager.memberEpoch()); // InstanceId - only sent if has changed since the last heartbeat Review Comment: Did a follow up PR: https://github.com/apache/kafka/pull/15519 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: simplify consumer logic [kafka]
mjsax commented on PR #15519: URL: https://github.com/apache/kafka/pull/15519#issuecomment-1989525897 Follow up from https://github.com/apache/kafka/pull/15419/files#r1513841644 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: simplify consumer logic [kafka]
mjsax opened a new pull request, #15519: URL: https://github.com/apache/kafka/pull/15519 For static member, the `group.instance.id` cannot change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
artemlivshits commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1520443872 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +return call; +} + +@SuppressWarnings({"MethodLength", "NPathComplexity"}) +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( +final Collection topicNames, +DescribeTopicsOptions options +) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} } + +if (topicNamesList.isEmpty()) { +return new HashMap<>(topicFutures); +} + +// First, we need to retrieve the node info. +DescribeClusterResult clusterResult = describeCluster(); +Map nodes; +try { +nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); +} catch (InterruptedException | ExecutionException e) { +completeAllExceptionally(topicFutures.values(), e.getCause()); +return new HashMap<>(topicFutures); +} + +final long now = time.milliseconds(); +Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + +TopicDescription partiallyFinishedTopicDescription = null; +TopicDescription nextTopicDescription = null; + +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() +.setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (partiallyFinishedTopicDescription != null) { +request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() +.setTopicName(partiallyFinishedTopicDescription.name()) + .setPartitionIndex(partiallyFinishedTopicDescription.partitions().size()) +); +} +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); + +if (error != Errors.NONE) { +future.completeExceptionally(error.exception()); +pendingTopics.remove(topicName); +if (responseCursor != null && responseCursor.topicName().equals(topicName)) { +responseCursor = null; +} +continue; +} + +TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + +if (partiallyFinishedTopicDescription != null && partiallyFinishedTopicDescription.name().equals(topicName)) { + partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions()); +continue; +} + +
[jira] [Comment Edited] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825454#comment-17825454 ] Greg Harris edited comment on KAFKA-15841 at 3/11/24 9:46 PM: -- [~henriquemota] Okay I think i understand better what you're trying to achieve. > ... one topic per table... > We have a JDBC Sink for each table. Okay, you're using scenario (1), one connector per-topic, which should come to at most 90 * 100 = 9000 connectors per Connect cluster. That is certainly too many to fit on a single machine, and certainly needs a cluster to distribute the work. In this scenario, Connect should be able to distribute approximately 9000/M connectors and 9000/M tasks to each of the M workers in a distributed cluster, barring any other practical limits/timeouts that i'm not aware of, so check for ERROR messages. > We tried to change the 'topics' property in the configurations using the > 'taskConfigs(int maxTasks)' method, but Kafka Connect ignores this property > when it is returned by 'taskConfigs(int maxTasks)'. The reason it does this is because the `topics` property is passed to the consumers to have them subscribe to the input topics, and the Consumer/Connect processing model has this subscription be the same for all consumers. This doesn't mean that every consumer is consuming every topic, however. Having a uniform subscription across all of the consumers in a group tells the consumers to assign the work among themselves, assigning the topic-partitions to each of the consumers according to the configured assignor. As an example, say your connector config had `topics=a,b`, and these two topics had 2 partitions, and tasks.max=2. The `topics` configs for both task-0 and task-1 would both be `a,b`, but the 4 partitions could be distributed like this by the consumer partition assignor: task-0: a-0, b-0 task-1: a-1, b-1 Or any permutation. This is where the assignor I mentioned is important; The RangeAssignor can generate some pretty unbalanced assignments: [https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html] If you choose a different assignor (RoundRobin, Sticky, etc), then you can switch to scenario (2), with one connector per client, and some tasks.max around 10. This would give you ~90 connectors with 900 tasks, each working on 10 topics. You can tune tasks.max up and down if you need more throughput or want less consumer/task overhead. was (Author: gharris1727): [~henriquemota] Okay I think i understand better what you're trying to achieve. > ... one topic per table... > We have a JDBC Sink for each table. Okay, you're using scenario (1), one connector per-topic, which should come to at most 90 * 100 = 9000 connectors per Connect cluster. That is certainly too many to fit on a single machine, and certainly needs a cluster to distribute the work. In this scenario, Connect should be able to distribute approximately 9000/M connectors and 9000/M tasks to each of the M workers in a distributed cluster, barring any other practical limits/timeouts that i'm not aware of, so check for ERROR messages. > We tried to change the 'topics' property in the configurations using the > 'taskConfigs(int maxTasks)' method, but Kafka Connect ignores this property > when it is returned by 'taskConfigs(int maxTasks)'. The reason it does this is because the `topics` property is passed to the consumers to have them subscribe to the input topics, and the Consumer/Connect processing model has this subscription be the same for all consumers. This doesn't mean that every consumer is consuming every topic, however. Having a uniform subscription across all of the consumers in a group tells the consumers to assign the work among themselves, assigning the topic-partitions to each of the consumers according to the configured assignor. As an example, say your connector config had `topics=a,b`, and these two topics had 2 partitions, and tasks.max=2. The `topics` configs for both task-0 and task-1 would both be `a,b`, but the 4 partitions could be distributed like this by the consumer partition assignor: task-0: a-0, b-0 task-1: a-1, b-1 Or any permutation. This is where the partitioner I mentioned is important; The RangeAssignor can generate some pretty unbalanced assignments: [https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html] If you choose a different assignor (RoundRobin, Sticky, etc), then you can switch to scenario (2), with one connector per client, and some tasks.max around 10. This would give you ~90 connectors with 900 tasks, each working on 10 topics. Tou can tune tasks.max up and down if you need more throughput or want less consumer/task overhead. > Add Support for Topic-Level Partitioning in Kafka Connect > ---
[jira] [Commented] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825454#comment-17825454 ] Greg Harris commented on KAFKA-15841: - [~henriquemota] Okay I think i understand better what you're trying to achieve. > ... one topic per table... > We have a JDBC Sink for each table. Okay, you're using scenario (1), one connector per-topic, which should come to at most 90 * 100 = 9000 connectors per Connect cluster. That is certainly too many to fit on a single machine, and certainly needs a cluster to distribute the work. In this scenario, Connect should be able to distribute approximately 9000/M connectors and 9000/M tasks to each of the M workers in a distributed cluster, barring any other practical limits/timeouts that i'm not aware of, so check for ERROR messages. > We tried to change the 'topics' property in the configurations using the > 'taskConfigs(int maxTasks)' method, but Kafka Connect ignores this property > when it is returned by 'taskConfigs(int maxTasks)'. The reason it does this is because the `topics` property is passed to the consumers to have them subscribe to the input topics, and the Consumer/Connect processing model has this subscription be the same for all consumers. This doesn't mean that every consumer is consuming every topic, however. Having a uniform subscription across all of the consumers in a group tells the consumers to assign the work among themselves, assigning the topic-partitions to each of the consumers according to the configured assignor. As an example, say your connector config had `topics=a,b`, and these two topics had 2 partitions, and tasks.max=2. The `topics` configs for both task-0 and task-1 would both be `a,b`, but the 4 partitions could be distributed like this by the consumer partition assignor: task-0: a-0, b-0 task-1: a-1, b-1 Or any permutation. This is where the partitioner I mentioned is important; The RangeAssignor can generate some pretty unbalanced assignments: [https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html] If you choose a different assignor (RoundRobin, Sticky, etc), then you can switch to scenario (2), with one connector per client, and some tasks.max around 10. This would give you ~90 connectors with 900 tasks, each working on 10 topics. Tou can tune tasks.max up and down if you need more throughput or want less consumer/task overhead. > Add Support for Topic-Level Partitioning in Kafka Connect > - > > Key: KAFKA-15841 > URL: https://issues.apache.org/jira/browse/KAFKA-15841 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Henrique Mota >Priority: Trivial > Attachments: image-2024-02-19-13-48-55-875.png > > > In our organization, we utilize JDBC sink connectors to consume data from > various topics, where each topic is dedicated to a specific tenant with a > single partition. Recently, we developed a custom sink based on the standard > JDBC sink, enabling us to pause consumption of a topic when encountering > problematic records. > However, we face limitations within Kafka Connect, as it doesn't allow for > appropriate partitioning of topics among workers. We attempted a workaround > by breaking down the topics list within the 'topics' parameter. > Unfortunately, Kafka Connect overrides this parameter after invoking the > {{taskConfigs(int maxTasks)}} method from the > {{org.apache.kafka.connect.connector.Connector}} class. > We request the addition of support in Kafka Connect to enable the > partitioning of topics among workers without requiring a fork. This > enhancement would facilitate better load distribution and allow for more > flexible configurations, particularly in scenarios where topics are dedicated > to different tenants. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16345: Optionally URL-encode clientID and clientSecret in authorization header [kafka]
bachmanity1 commented on PR #15475: URL: https://github.com/apache/kafka/pull/15475#issuecomment-1989425275 @kirktrue @mimaison kind reminder. Could you please have a look at the KIP? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520418403 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/MemberState.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import java.util.HashMap; +import java.util.Map; + +/** + * The various states that a member can be in. For their definition, + * refer to the documentation of {{@link CurrentAssignmentBuilder}}. Review Comment: Is this still the case? We removed a lot of the javadoc there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520383812 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -33,49 +33,6 @@ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. - * - * The member state has the following properties: - * - Current Epoch: Review Comment: Do we have a replacement summary for this java doc? I noticed there were more comments in the code, but not sure if they covered everything. EDIT: I see some of the descriptions in the new MemberState class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520414314 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -170,72 +127,122 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions( * @return A new ConsumerGroupMember or the current one. */ public ConsumerGroupMember build() { -// A new target assignment has been installed, we need to restart -// the reconciliation loop from the beginning. -if (targetAssignmentEpoch != member.targetMemberEpoch()) { -return transitionToNewTargetAssignmentState(); -} - switch (member.state()) { -// Check if the partitions have been revoked by the member. -case REVOKING: -return maybeTransitionFromRevokingToAssigningOrStable(); +case STABLE: +// When the member is in the STABLE state, we verify if a newer +// epoch (or target assignment) is available. If it is, we can +// reconcile the member towards it. Otherwise, we return. +if (member.memberEpoch() != targetAssignmentEpoch) { +return computeNextAssignment( +member.memberEpoch(), +member.assignedPartitions() +); +} else { +return member; +} -// Check if pending partitions have been freed up. -case ASSIGNING: -return maybeTransitionFromAssigningToAssigningOrStable(); +case UNREVOKED_PARTITIONS: +// When the member is in the UNREVOKED_PARTITIONS state, we wait +// until the member has revoked the necessary partitions. They are +// considered revoked when they are not anymore reported in the +// owned partitions set in the ConsumerGroupHeartbeat API. -// Nothing to do. -case STABLE: -return member; +// If the member does not provide its owned partitions. We cannot +// progress. +if (ownedTopicPartitions == null) { +return member; +} + +// If the member provides its owned partitions. We verify if it still +// owns any of the revoked partitions. If it does, we cannot progress. +for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) { +for (Integer partitionId : topicPartitions.partitions()) { +boolean stillHasRevokedPartition = member +.partitionsPendingRevocation() +.getOrDefault(topicPartitions.topicId(), Collections.emptySet()) +.contains(partitionId); +if (stillHasRevokedPartition) { +return member; +} +} +} + +// When the member has revoked all the pending partitions, it can +// transition to the next epoch (current + 1) and we can reconcile +// its state towards the latest target assignment. +return computeNextAssignment( +member.memberEpoch() + 1, +member.assignedPartitions() +); + +case UNRELEASED_PARTITIONS: +// When the member is in the UNRELEASED_PARTITIONS, we reconcile the +// member towards the latest target assignment. This will assign any +// of the unreleased partitions when they become available. +return computeNextAssignment( +member.memberEpoch(), +member.assignedPartitions() +); + +case UNKNOWN: +// We could only end up in this state if a new state is added in the +// future and the group coordinator is downgraded. In this case, the +// best option is to fence the member to force it to rejoin the group +// without any partitions and to reconcile it again from scratch. +if (ownedTopicPartitions == null || !ownedTopicPartitions.isEmpty()) { +throw new FencedMemberEpochException("The consumer group member is in a unknown state. " ++ "The member must abandon all its partitions and rejoin."); +} + +return computeNextAssignment( Review Comment: Is the idea we only hit this case below on restart? Ie, we fence and force the member out of the group, but it will still be unknown on rejoining (just with no owned partitions) -- This is an automated message fro
[jira] [Commented] (KAFKA-16356) Remove class-name dispatch in RemoteLogMetadataSerde
[ https://issues.apache.org/jira/browse/KAFKA-16356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825439#comment-17825439 ] Greg Harris commented on KAFKA-16356: - [~linu] I have given you permissions to assign tickets. You can assign this ticket and begin working on it when you have time. Thanks for your interest in Kafka! > Remove class-name dispatch in RemoteLogMetadataSerde > > > Key: KAFKA-16356 > URL: https://issues.apache.org/jira/browse/KAFKA-16356 > Project: Kafka > Issue Type: Task > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Greg Harris >Priority: Trivial > Labels: newbie > > The RemoteLogMetadataSerde#serialize receives a RemoteLogMetadata object, and > has to dispatch to one of four serializers depending on it's type. This is > done by taking the class name of the RemoteLogMetadata and looking it up in > maps to find the corresponding serializer for that class. > This later requires an unchecked cast, because the RemoteLogMetadataTransform > is generic. This is all type-unsafe, and can be replaced with type-safe > if-elseif-else statements that may also be faster than the double-indirect > map lookups. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520383812 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -33,49 +33,6 @@ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. - * - * The member state has the following properties: - * - Current Epoch: Review Comment: Do we have a replacement summary for this java doc? I noticed there were more comments in the code, but not sure if they covered everything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520381484 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ## @@ -528,27 +478,6 @@ public Map> partitionsPendingRevocation() { return partitionsPendingRevocation; } -/** - * @return The set of partitions awaiting assignment to the member. - */ -public Map> partitionsPendingAssignment() { -return partitionsPendingAssignment; -} - -/** - * @return A string representation of the current assignment state. - */ -public String currentAssignmentSummary() { Review Comment: Did we get rid of this because it is not used anywhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Tweak streams config doc [kafka]
cherylws commented on PR #15518: URL: https://github.com/apache/kafka/pull/15518#issuecomment-1989345872 @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16295: Align RocksDB and in-memory store init() sequences [kafka]
wcarlson5 commented on code in PR #15421: URL: https://github.com/apache/kafka/pull/15421#discussion_r1520361134 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ## @@ -170,9 +170,6 @@ public void init(final ProcessorContext context, @Override public void init(final StateStoreContext context, final StateStore root) { -// open the DB dir -metricsRecorder.init(getMetricsImpl(context), context.taskId()); -openDB(context.appConfigs(), context.stateDir()); Review Comment: Can we get one more line deleted here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Minor: Tweak streams config doc [kafka]
cherylws opened a new pull request, #15518: URL: https://github.com/apache/kafka/pull/15518 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Kafka Streams docs fixes [kafka]
mjsax opened a new pull request, #15517: URL: https://github.com/apache/kafka/pull/15517 - add missing section to TOC - add default value for client.id -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]
jolshan commented on code in PR #15476: URL: https://github.com/apache/kafka/pull/15476#discussion_r1520300581 ## core/src/test/scala/integration/kafka/api/OffsetOfMaxTimestampTest.scala: ## @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.api + +import kafka.api.IntegrationTestHarness +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{Admin, NewTopic, OffsetSpec} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util.{Collections, Properties} + +class OffsetOfMaxTimestampTest extends IntegrationTestHarness { + @AfterEach + override def tearDown(): Unit = { +TestUtils.shutdownServers(brokers) +super.tearDown() + } + + override def brokerCount: Int = 1 + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testWithNoCompression(quorum: String): Unit = { +test(false) + } + + private def test(useCompression: Boolean): Unit = { +val topicName: String = "OffsetOfMaxTimestampTest-" + System.currentTimeMillis() + +val admin: Admin = Admin.create(adminClientConfig) +try { + admin.createTopics(Collections.singletonList(new NewTopic(topicName, 1, 1.toShort) +.configs(Collections.singletonMap(TopicConfig.COMPRESSION_TYPE_CONFIG, if (useCompression) "gzip" else "none" + val props: Properties = new Properties + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, if (useCompression) "gzip" else "none") + val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props) + try { +val time: Long = 1 +producer.send(new ProducerRecord[String, String](topicName, 0, time + 100, null, "val20")) +producer.send(new ProducerRecord[String, String](topicName, 0, time + 400, null, "val15")) +producer.send(new ProducerRecord[String, String](topicName, 0, time + 250, null, "val15")) Review Comment: For the non-compression case, we can just make the record size 10 B (rather than 10KB). There is also some discussion about how to control the batch size via flush -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]
jolshan commented on code in PR #15476: URL: https://github.com/apache/kafka/pull/15476#discussion_r1520299410 ## core/src/test/scala/integration/kafka/api/OffsetOfMaxTimestampTest.scala: ## @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.api + +import kafka.api.IntegrationTestHarness +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{Admin, NewTopic, OffsetSpec} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util.{Collections, Properties} + +class OffsetOfMaxTimestampTest extends IntegrationTestHarness { + @AfterEach + override def tearDown(): Unit = { +TestUtils.shutdownServers(brokers) +super.tearDown() + } + + override def brokerCount: Int = 1 + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testWithNoCompression(quorum: String): Unit = { +test(false) + } + + private def test(useCompression: Boolean): Unit = { +val topicName: String = "OffsetOfMaxTimestampTest-" + System.currentTimeMillis() + +val admin: Admin = Admin.create(adminClientConfig) +try { + admin.createTopics(Collections.singletonList(new NewTopic(topicName, 1, 1.toShort) +.configs(Collections.singletonMap(TopicConfig.COMPRESSION_TYPE_CONFIG, if (useCompression) "gzip" else "none" + val props: Properties = new Properties + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, if (useCompression) "gzip" else "none") + val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props) + try { +val time: Long = 1 +producer.send(new ProducerRecord[String, String](topicName, 0, time + 100, null, "val20")) +producer.send(new ProducerRecord[String, String](topicName, 0, time + 400, null, "val15")) +producer.send(new ProducerRecord[String, String](topicName, 0, time + 250, null, "val15")) Review Comment: Do we need a separate test for this? I mentioned how to test here: https://github.com/apache/kafka/pull/15474#discussion_r1516625703 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]
mimaison opened a new pull request, #15516: URL: https://github.com/apache/kafka/pull/15516 Based on https://github.com/apache/kafka/pull/5927 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] dummy PR to check acls [kafka]
JobseRyan closed pull request #15515: dummy PR to check acls URL: https://github.com/apache/kafka/pull/15515 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] dummy PR to check acls [kafka]
JobseRyan opened a new pull request, #15515: URL: https://github.com/apache/kafka/pull/15515 dummy PR ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] User/rjobse/add xinfra identifier check [kafka]
JobseRyan closed pull request #15514: User/rjobse/add xinfra identifier check URL: https://github.com/apache/kafka/pull/15514 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] User/rjobse/add xinfra identifier check [kafka]
JobseRyan opened a new pull request, #15514: URL: https://github.com/apache/kafka/pull/15514 Pull request to add a check for the client type and library version to check if the client which is accessing this broker is a xinfra client ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1520206161 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -36,40 +37,79 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) createTopic(topicName, 1, 1.toShort) -produceMessages() adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { Review Comment: How about using `CsvSource` to list all cases? for example: ```scala @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @CsvSource(Array( "zk, true, true, true", "zk, true, true, false", "zk, true, false, true", "zk, true, false, false", "zk, false, true, true", "zk, false, true, false", //"zk, false, false, true", KAFKA-16341 will enable it "zk, false, false, false", "kraft, true, false, true", "kraft, true, false, false", //"kraft, false, false, true", KAFKA-16341 will enable it "kraft, false, false, false" )) def test(quorum: String, compression: Boolean, oldMessage: Boolean, oneBatch: Boolean): Unit = { if (oldMessage) createOldMessageFormatBrokers() if (oneBatch) produceMessagesInOneBatch(if(compression) "gzip" else "none") else produceMessagesInSeparateBatch(if(compression) "gzip" else "none") verifyListOffsets() } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1520201655 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -230,8 +234,19 @@ private void emitNonJoinedOuterRecords( sharedTimeTracker.minTime = timestamp; // Skip next records if window has not closed -if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { -break; +final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); +if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { +if (timestampedKeyAndJoinSide.isLeftSide()) { +outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side +} else { +outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side +} +if (outerJoinLeftBreak && outerJoinRightBreak) { Review Comment: See: https://github.com/apache/kafka/pull/15510 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520191884 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -37,4 +42,28 @@ public static OptionalInt ofSentinel(int value) { public static OptionalLong ofSentinel(long value) { return value != -1 ? OptionalLong.of(value) : OptionalLong.empty(); } + +/** + * @return The provided assignment as a String. Review Comment: Could we maybe include and example string and what its components are? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16363) Storage crashes if dir is unavailable
[ https://issues.apache.org/jira/browse/KAFKA-16363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825402#comment-17825402 ] Igor Soarez commented on KAFKA-16363: - cc [~pprovenzano] > Storage crashes if dir is unavailable > - > > Key: KAFKA-16363 > URL: https://issues.apache.org/jira/browse/KAFKA-16363 > Project: Kafka > Issue Type: Sub-task > Components: tools >Affects Versions: 3.7.0 >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > > The storage tool crashes if one of the configured log directories is > unavailable. > > {code:java} > sh-4.4# ./bin/kafka-storage.sh format --ignore-formatted -t $KAFKA_CLUSTER_ID > -c server.properties > [2024-03-11 17:51:05,391] ERROR Error while reading meta.properties file > /data/d2/meta.properties > (org.apache.kafka.metadata.properties.MetaPropertiesEnsemble) > java.nio.file.AccessDeniedException: /data/d2/meta.properties > at > java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:90) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) > at > java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:218) > at java.base/java.nio.file.Files.newByteChannel(Files.java:380) > at java.base/java.nio.file.Files.newByteChannel(Files.java:432) > at > java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:422) > at java.base/java.nio.file.Files.newInputStream(Files.java:160) > at > org.apache.kafka.metadata.properties.PropertiesUtils.readPropertiesFile(PropertiesUtils.java:77) > at > org.apache.kafka.metadata.properties.MetaPropertiesEnsemble$Loader.load(MetaPropertiesEnsemble.java:135) > at kafka.tools.StorageTool$.formatCommand(StorageTool.scala:431) > at kafka.tools.StorageTool$.main(StorageTool.scala:95) > at kafka.tools.StorageTool.main(StorageTool.scala) > metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, > dirs={/data/d1: MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, > nodeId=101, directoryId=zm7fSw3zso9aR0AtuzsI_A), /data/metadata: > MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, nodeId=101, > directoryId=eRO8vOP7ddbpx_W2ZazjLw), /data/d2: ERROR}) > I/O error trying to read log directory /data/d2. > {code} > When configured with multiple directories, Kafka tolerates some of them (but > not all) being inaccessible, so this tool should be able to handle the same > scenarios without crashing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16363) Storage crashes if dir is unavailable
[ https://issues.apache.org/jira/browse/KAFKA-16363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reassigned KAFKA-16363: --- Assignee: Igor Soarez > Storage crashes if dir is unavailable > - > > Key: KAFKA-16363 > URL: https://issues.apache.org/jira/browse/KAFKA-16363 > Project: Kafka > Issue Type: Sub-task > Components: tools >Affects Versions: 3.7.0 >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > > The storage tool crashes if one of the configured log directories is > unavailable. > > {code:java} > sh-4.4# ./bin/kafka-storage.sh format --ignore-formatted -t $KAFKA_CLUSTER_ID > -c server.properties > [2024-03-11 17:51:05,391] ERROR Error while reading meta.properties file > /data/d2/meta.properties > (org.apache.kafka.metadata.properties.MetaPropertiesEnsemble) > java.nio.file.AccessDeniedException: /data/d2/meta.properties > at > java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:90) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) > at > java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:218) > at java.base/java.nio.file.Files.newByteChannel(Files.java:380) > at java.base/java.nio.file.Files.newByteChannel(Files.java:432) > at > java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:422) > at java.base/java.nio.file.Files.newInputStream(Files.java:160) > at > org.apache.kafka.metadata.properties.PropertiesUtils.readPropertiesFile(PropertiesUtils.java:77) > at > org.apache.kafka.metadata.properties.MetaPropertiesEnsemble$Loader.load(MetaPropertiesEnsemble.java:135) > at kafka.tools.StorageTool$.formatCommand(StorageTool.scala:431) > at kafka.tools.StorageTool$.main(StorageTool.scala:95) > at kafka.tools.StorageTool.main(StorageTool.scala) > metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, > dirs={/data/d1: MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, > nodeId=101, directoryId=zm7fSw3zso9aR0AtuzsI_A), /data/metadata: > MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, nodeId=101, > directoryId=eRO8vOP7ddbpx_W2ZazjLw), /data/d2: ERROR}) > I/O error trying to read log directory /data/d2. > {code} > When configured with multiple directories, Kafka tolerates some of them (but > not all) being inaccessible, so this tool should be able to handle the same > scenarios without crashing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16363) Storage crashes if dir is unavailable
Igor Soarez created KAFKA-16363: --- Summary: Storage crashes if dir is unavailable Key: KAFKA-16363 URL: https://issues.apache.org/jira/browse/KAFKA-16363 Project: Kafka Issue Type: Sub-task Components: tools Affects Versions: 3.7.0 Reporter: Igor Soarez The storage tool crashes if one of the configured log directories is unavailable. {code:java} sh-4.4# ./bin/kafka-storage.sh format --ignore-formatted -t $KAFKA_CLUSTER_ID -c server.properties [2024-03-11 17:51:05,391] ERROR Error while reading meta.properties file /data/d2/meta.properties (org.apache.kafka.metadata.properties.MetaPropertiesEnsemble) java.nio.file.AccessDeniedException: /data/d2/meta.properties at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:90) at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106) at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) at java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:218) at java.base/java.nio.file.Files.newByteChannel(Files.java:380) at java.base/java.nio.file.Files.newByteChannel(Files.java:432) at java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:422) at java.base/java.nio.file.Files.newInputStream(Files.java:160) at org.apache.kafka.metadata.properties.PropertiesUtils.readPropertiesFile(PropertiesUtils.java:77) at org.apache.kafka.metadata.properties.MetaPropertiesEnsemble$Loader.load(MetaPropertiesEnsemble.java:135) at kafka.tools.StorageTool$.formatCommand(StorageTool.scala:431) at kafka.tools.StorageTool$.main(StorageTool.scala:95) at kafka.tools.StorageTool.main(StorageTool.scala) metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/data/d1: MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, nodeId=101, directoryId=zm7fSw3zso9aR0AtuzsI_A), /data/metadata: MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, nodeId=101, directoryId=eRO8vOP7ddbpx_W2ZazjLw), /data/d2: ERROR}) I/O error trying to read log directory /data/d2. {code} When configured with multiple directories, Kafka tolerates some of them (but not all) being inaccessible, so this tool should be able to handle the same scenarios without crashing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520171958 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1796,12 +1834,12 @@ public void onLoaded() { consumerGroup.members().forEach((memberId, member) -> { log.debug("Loaded member {} in consumer group {}.", memberId, groupId); scheduleConsumerGroupSessionTimeout(groupId, memberId); -if (member.state() == ConsumerGroupMember.MemberState.REVOKING) { -scheduleConsumerGroupRevocationTimeout( +if (member.state() == MemberState.UNREVOKED_PARTITIONS) { +scheduleConsumerGroupRebalanceTimeout( groupId, -memberId, -member.rebalanceTimeoutMs(), -member.memberEpoch() +member.memberId(), Review Comment: did we change this from memberId to member.memberId just to be consistent with the other arguments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520152684 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1400,35 +1440,35 @@ private void cancelConsumerGroupSessionTimeout( } /** - * Schedules a revocation timeout for the member. + * Schedules a rebalance timeout for the member. * * @param groupId The group id. * @param memberId The member id. - * @param revocationTimeoutMs The revocation timeout. - * @param expectedMemberEpoch The expected member epoch. + * @param memberEpoch The member epoch. + * @param rebalanceTimeoutMsThe rebalance timeout. */ -private void scheduleConsumerGroupRevocationTimeout( +private void scheduleConsumerGroupRebalanceTimeout( String groupId, String memberId, -long revocationTimeoutMs, -int expectedMemberEpoch +int memberEpoch, +int rebalanceTimeoutMs ) { -String key = consumerGroupRevocationTimeoutKey(groupId, memberId); -timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +String key = consumerGroupRebalanceTimeoutKey(groupId, memberId); +timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { try { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); -if (member.state() != ConsumerGroupMember.MemberState.REVOKING || -member.memberEpoch() != expectedMemberEpoch) { -log.debug("[GroupId {}] Ignoring revocation timeout for {} because the member " + -"state does not match the expected state.", groupId, memberId); +if (member.memberEpoch() == memberEpoch) { +log.info("[GroupId {}] Member {} fenced from the group because " + +"it failed to transition from epoch {} within {}ms.", +groupId, memberId, memberEpoch, rebalanceTimeoutMs); +return new CoordinatorResult<>(consumerGroupFenceMember(group, member)); Review Comment: So it seems like this is one of the main changes here -- we don't validate on the revoking state -- I guess we could be in a revoking state for the next assignment... I may have asked this on a previous pr, but are we assuming the member epoch of the member (not the one passed in) is always never less than the member epoch passed into this method. That makes sense given the epoch is monotonically increasing, but just wanted to confirm. As an aside, when we fence a group member, do we basically kick it out of the group and force it to rejoin? Can the client rejoin without restarting? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16362) Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide
[ https://issues.apache.org/jira/browse/KAFKA-16362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825397#comment-17825397 ] Greg Harris commented on KAFKA-16362: - cc [~mjsax] [~ableegoldman] I looked through the (currently ignored) rawtypes warnings in Streams and this was one that I really didn't have a simple resolution for, and I think needs a real refactor to make type-safe. I don't think there's a bug hidden here, but the code didn't give me any confidence on that. I adjusted a test to prove to myself that the current implementation is correct, even if it is type-unsafe: [https://github.com/apache/kafka/pull/15513] Given the number of times that rawtypes are used in Streams internals, I wasn't sure if this was even a concern for you, and if you're interested in reducing the number of rawtypes used in streams over the long term. > Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide > > > Key: KAFKA-16362 > URL: https://issues.apache.org/jira/browse/KAFKA-16362 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 3.7.0 >Reporter: Greg Harris >Priority: Trivial > Labels: newbie++ > > The implementation of KStreamKStreamJoin has several places that the compiler > emits warnings for, that are later suppressed or ignored: > * LeftOrRightValue.make returns a raw LeftOrRightValue without generic > arguments, because the generic type arguments depend on the boolean input. > * Calling LeftOrRightValue includes an unchecked cast before inserting the > record into the outerJoinStore > * emitNonJoinedOuterRecords swaps the left and right values, and performs an > unchecked cast > These seem to be closely related to the isLeftSide variable, which makes the > class behave differently whether it is present on the left or right side of a > join. > We should figure out if these warnings can be eliminated by a refactor, > perhaps into KStreamKstreamJoin.Left and KStreamKStreamJoin.Right, or with > some generic arguments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520152684 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1400,35 +1440,35 @@ private void cancelConsumerGroupSessionTimeout( } /** - * Schedules a revocation timeout for the member. + * Schedules a rebalance timeout for the member. * * @param groupId The group id. * @param memberId The member id. - * @param revocationTimeoutMs The revocation timeout. - * @param expectedMemberEpoch The expected member epoch. + * @param memberEpoch The member epoch. + * @param rebalanceTimeoutMsThe rebalance timeout. */ -private void scheduleConsumerGroupRevocationTimeout( +private void scheduleConsumerGroupRebalanceTimeout( String groupId, String memberId, -long revocationTimeoutMs, -int expectedMemberEpoch +int memberEpoch, +int rebalanceTimeoutMs ) { -String key = consumerGroupRevocationTimeoutKey(groupId, memberId); -timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +String key = consumerGroupRebalanceTimeoutKey(groupId, memberId); +timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { try { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); -if (member.state() != ConsumerGroupMember.MemberState.REVOKING || -member.memberEpoch() != expectedMemberEpoch) { -log.debug("[GroupId {}] Ignoring revocation timeout for {} because the member " + -"state does not match the expected state.", groupId, memberId); +if (member.memberEpoch() == memberEpoch) { +log.info("[GroupId {}] Member {} fenced from the group because " + +"it failed to transition from epoch {} within {}ms.", +groupId, memberId, memberEpoch, rebalanceTimeoutMs); +return new CoordinatorResult<>(consumerGroupFenceMember(group, member)); Review Comment: So it seems like this is one of the main changes here -- we don't validate on the revoking state. I may have asked this on a previous pr, but are we assuming the member epoch of the member (not the one passed in) is always never less than the member epoch passed into this method. That makes sense given the epoch is monotonically increasing, but just wanted to confirm. As an aside, when we fence a group member, do we basically kick it out of the group and force it to rejoin? Can the client rejoin without restarting? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Change KStreamKstreamOuterJoinTest to use distinct left and right types [kafka]
gharris1727 opened a new pull request, #15513: URL: https://github.com/apache/kafka/pull/15513 This test uses the same value types on the left and right, and so wouldn't be sensitive to a mixup between left and right values. So I changed one of the stream types to `Long`, and updated the assertions to match. Because the implementation of the KStreamKstreamOuterJoin is not type-safe, it was unclear just from the code that a mixup was not possible. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520152684 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1400,35 +1440,35 @@ private void cancelConsumerGroupSessionTimeout( } /** - * Schedules a revocation timeout for the member. + * Schedules a rebalance timeout for the member. * * @param groupId The group id. * @param memberId The member id. - * @param revocationTimeoutMs The revocation timeout. - * @param expectedMemberEpoch The expected member epoch. + * @param memberEpoch The member epoch. + * @param rebalanceTimeoutMsThe rebalance timeout. */ -private void scheduleConsumerGroupRevocationTimeout( +private void scheduleConsumerGroupRebalanceTimeout( String groupId, String memberId, -long revocationTimeoutMs, -int expectedMemberEpoch +int memberEpoch, +int rebalanceTimeoutMs ) { -String key = consumerGroupRevocationTimeoutKey(groupId, memberId); -timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +String key = consumerGroupRebalanceTimeoutKey(groupId, memberId); +timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { try { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); -if (member.state() != ConsumerGroupMember.MemberState.REVOKING || -member.memberEpoch() != expectedMemberEpoch) { -log.debug("[GroupId {}] Ignoring revocation timeout for {} because the member " + -"state does not match the expected state.", groupId, memberId); +if (member.memberEpoch() == memberEpoch) { +log.info("[GroupId {}] Member {} fenced from the group because " + +"it failed to transition from epoch {} within {}ms.", +groupId, memberId, memberEpoch, rebalanceTimeoutMs); +return new CoordinatorResult<>(consumerGroupFenceMember(group, member)); Review Comment: So it seems like this is one of the main changes here. When we fence a group member, do we basically kick it out of the group and force it to rejoin? Can the client rejoin without restarting? I may have asked this on a previous pr, but are we assuming the member epoch of the member (not the one passed in) is always never less than the member epoch passed into this method. That makes sense given the epoch is monotonically increasing, but just wanted to confirm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix incorrect syntax for config [kafka]
mjsax commented on PR #15500: URL: https://github.com/apache/kafka/pull/15500#issuecomment-1989056918 Merged to `trunk` and cherry-picked to `3.7` and `3.6` branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520152684 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1400,35 +1440,35 @@ private void cancelConsumerGroupSessionTimeout( } /** - * Schedules a revocation timeout for the member. + * Schedules a rebalance timeout for the member. * * @param groupId The group id. * @param memberId The member id. - * @param revocationTimeoutMs The revocation timeout. - * @param expectedMemberEpoch The expected member epoch. + * @param memberEpoch The member epoch. + * @param rebalanceTimeoutMsThe rebalance timeout. */ -private void scheduleConsumerGroupRevocationTimeout( +private void scheduleConsumerGroupRebalanceTimeout( String groupId, String memberId, -long revocationTimeoutMs, -int expectedMemberEpoch +int memberEpoch, +int rebalanceTimeoutMs ) { -String key = consumerGroupRevocationTimeoutKey(groupId, memberId); -timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +String key = consumerGroupRebalanceTimeoutKey(groupId, memberId); +timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { try { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); -if (member.state() != ConsumerGroupMember.MemberState.REVOKING || -member.memberEpoch() != expectedMemberEpoch) { -log.debug("[GroupId {}] Ignoring revocation timeout for {} because the member " + -"state does not match the expected state.", groupId, memberId); +if (member.memberEpoch() == memberEpoch) { +log.info("[GroupId {}] Member {} fenced from the group because " + +"it failed to transition from epoch {} within {}ms.", +groupId, memberId, memberEpoch, rebalanceTimeoutMs); +return new CoordinatorResult<>(consumerGroupFenceMember(group, member)); Review Comment: So it seems like this is one of the main changes here. When we fence a group member, do we basically kick it out of the group and force it to rejoin? Can the client do this without restarting? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16362) Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide
Greg Harris created KAFKA-16362: --- Summary: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide Key: KAFKA-16362 URL: https://issues.apache.org/jira/browse/KAFKA-16362 Project: Kafka Issue Type: Task Components: streams Affects Versions: 3.7.0 Reporter: Greg Harris The implementation of KStreamKStreamJoin has several places that the compiler emits warnings for, that are later suppressed or ignored: * LeftOrRightValue.make returns a raw LeftOrRightValue without generic arguments, because the generic type arguments depend on the boolean input. * Calling LeftOrRightValue includes an unchecked cast before inserting the record into the outerJoinStore * emitNonJoinedOuterRecords swaps the left and right values, and performs an unchecked cast These seem to be closely related to the isLeftSide variable, which makes the class behave differently whether it is present on the left or right side of a join. We should figure out if these warnings can be eliminated by a refactor, perhaps into KStreamKstreamJoin.Left and KStreamKStreamJoin.Right, or with some generic arguments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: update kraft_upgrade_test to create a new topic after metadata upgrade [kafka]
soarez commented on PR #15451: URL: https://github.com/apache/kafka/pull/15451#issuecomment-1989015331 @showuon could you have a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520125834 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1192,99 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; Review Comment: Did we follow up here as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1520116151 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -1837,8 +1872,13 @@ void generateLeaderAndIsrUpdates(String context, // from the target ISR, but we need to exclude it here too, to handle the case // where there is an unclean leader election which chooses a leader from outside // the ISR. +// +// If the caller passed a valid broker ID for brokerWithUncleanShutdown, rather than +// passing NO_LEADER, this node should not be an acceptable leader. We also exclude +// brokerWithUncleanShutdown from ELR and ISR. IntPredicate isAcceptableLeader = -r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.isActive(r)); +r -> (r != brokerToRemove && r != brokerWithUncleanShutdown) +&& (r == brokerToAdd || clusterControl.isActive(r)); Review Comment: I disabled the handleUncleanShutdown if ELR is not enabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1520115442 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2240,6 +2283,25 @@ private void updatePartitionDirectories( } } +private void updatePartitionInfo( +Uuid topicId, +Integer partitionId, +PartitionRegistration prevPartInfo, +PartitionRegistration newPartInfo +) { +HashSet validationSet = new HashSet<>(); +Arrays.stream(newPartInfo.isr).forEach(ii -> validationSet.add(ii)); +Arrays.stream(newPartInfo.elr).forEach(ii -> validationSet.add(ii)); +if (validationSet.size() != newPartInfo.isr.length + newPartInfo.elr.length) { +log.warn("{}-{} has overlapping ISR={} and ELR={}", topics.get(topicId).name, partitionId, +Arrays.toString(newPartInfo.isr), partitionId, Arrays.toString(newPartInfo.elr)); Review Comment: Correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1520115750 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -336,10 +350,10 @@ public ControllerResult registerBroker( ", but got cluster ID " + request.clusterId()); } int brokerId = request.brokerId(); +List records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { -// TODO(KIP-966): Update the ELR if the broker has an unclean shutdown. -log.debug("Received an unclean shutdown request"); Review Comment: Good catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520113443 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1192,99 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. -if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { +if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } +/** + * Reconciles the current assignment of the member if needed. + * + * @param groupId The group id. + * @param memberThe member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. + * @return The received member if no changes have been made; or a new + * member containing the new assignment. + */ +private ConsumerGroupMember maybeReconcile( +String groupId, +ConsumerGroupMember member, +BiFunction currentPartitionEpoch, +int targetAssignmentEpoch, +Assignment targetAssignment, +List ownedTopicPartitions, +List records +) { +if (member.isReconciledTo(targetAssignmentEpoch)) { +return member; +} + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(targetAssignmentEpoch, targetAssignment) +.withCurrentPartitionEpoch(currentPartitionEpoch) +.withOwnedTopicPartitions(ownedTopicPartitions) +.build(); + +if (!updatedMember.equals(member)) { +records.add(newCurrentAssignmentRecord(groupId, updatedMember)); + +log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, " + + "assignedPartitions={} and revokedPartitions={}.", +groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), +formatAssignment(updatedMember.assignedPartitions()), formatAssignment(updatedMember.revokedPartitions())); + +if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { +scheduleConsumerGroupRebalanceTimeout( +groupId, +updatedMember.memberId(), +updatedMember.memberEpoch(), +updatedMember.rebalanceTimeoutMs() +); +} else { Review Comment: Did we come to a resolution here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on PR #15364: URL: https://github.com/apache/kafka/pull/15364#issuecomment-1988980430 > This is a non-backward compatible change. I think that we should do this change to cleanup the record. As KIP-848 is only in early access in 3.7 and that we clearly state that we don't plane to support upgrade from it, this is acceptable in my opinion. Are we gating this record change under anything? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16255) AsyncKafkaConsumer should not use partition.assignment.strategy
[ https://issues.apache.org/jira/browse/KAFKA-16255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16255: -- Priority: Minor (was: Major) > AsyncKafkaConsumer should not use partition.assignment.strategy > --- > > Key: KAFKA-16255 > URL: https://issues.apache.org/jira/browse/KAFKA-16255 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > The {{partition.assignment.strategy}} configuration is used to specify a list > of zero or more {{ConsumerPartitionAssignor}} instances. However, that > interface is not applicable for the KIP-848-based protocol on top of which > {{AsyncKafkaConsumer}} is built. Therefore, the use of > {{ConsumerPartitionAssignor}} is inappropriate and should be removed from > {{{}AsyncKafkaConsumer{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15553) Review consumer positions update
[ https://issues.apache.org/jira/browse/KAFKA-15553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15553: -- Priority: Minor (was: Major) > Review consumer positions update > > > Key: KAFKA-15553 > URL: https://issues.apache.org/jira/browse/KAFKA-15553 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, position > Fix For: 3.8.0 > > > From the existing comment: If there are any partitions which do not have a > valid position and are not awaiting reset, then we need to fetch committed > offsets. > In the async consumer: I wonder if it would make sense to refresh the > position on the event loop continuously. > The logic to refresh offsets in the poll loop is quite fragile and works > largely by side-effects of the code that it calls. For example, the behaviour > of the "cached" value is really not that straightforward and simply reading > the cached value is not sufficient to start consuming data in all cases. > This area needs a bit of a refactor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16256) Update ConsumerConfig to validate use of group.remote.assignor and partition.assignment.strategy based on group.protocol
[ https://issues.apache.org/jira/browse/KAFKA-16256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16256: -- Priority: Minor (was: Major) > Update ConsumerConfig to validate use of group.remote.assignor and > partition.assignment.strategy based on group.protocol > > > Key: KAFKA-16256 > URL: https://issues.apache.org/jira/browse/KAFKA-16256 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > {{ConsumerConfig}} supports both the {{group.remote.assignor}} and > {{partition.assignment.strategy}} configuration options. These, however, > should not be used together; the former is applicable only when the > {{group.protocol}} is set to {{consumer}} and the latter when the > {{group.protocol}} is set to {{{}classic{}}}. We should emit a warning if the > user specifies the incorrect configuration based on the value of > {{{}group.protocol{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16298) Ensure user callbacks exceptions are propagated to the user on consumer poll
[ https://issues.apache.org/jira/browse/KAFKA-16298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16298: - Assignee: Kirk True > Ensure user callbacks exceptions are propagated to the user on consumer poll > > > Key: KAFKA-16298 > URL: https://issues.apache.org/jira/browse/KAFKA-16298 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Kirk True >Priority: Blocker > Labels: callback, kip-848-client-support > Fix For: 3.8.0 > > > When user-defined callbacks fail with an exception, the expectation is that > the error should be propagated to the user as a KafkaExpception and break the > poll loop (behaviour in the legacy coordinator). The new consumer executes > callbacks in the application thread, and sends an event to the background > with the callback result and error if any, [passing the error along with the > event > here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882] > to the background thread, but does not seem to propagate the exception to > the user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16361) Rack aware sticky assignor minQuota violations
Luke D created KAFKA-16361: -- Summary: Rack aware sticky assignor minQuota violations Key: KAFKA-16361 URL: https://issues.apache.org/jira/browse/KAFKA-16361 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.6.1, 3.7.0, 3.5.1 Reporter: Luke D In some low topic replication scenarios the rack aware assignment in the StickyAssignor fails to balance consumers to its own expectations and throws an IllegalStateException, commonly crashing the application (depending on application implementation). While uncommon the error is deterministic, and so persists until the replication state changes. We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely would also be reproducible there) Here is the error and stack from our test case against 3.7.0 {code:java} We haven't reached the expected number of members with more than the minQuota partitions, but no more partitions to be assigned java.lang.IllegalStateException: We haven't reached the expected number of members with more than the minQuota partitions, but no more partitions to be assigned at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113) at org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91) {code} Here is a specific test case from 3.7.0 that fails when passed to StickyAssignor.assign: {code:java} Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader = 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = topic_name, partition = 16, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 41, leader = 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 74, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 12, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 37, leader = 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = topic_name, partition = 70, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 8, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 33, leader = 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = topic_name, partition = 66, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 4, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 29, leader = 3, replicas = [3,1,2], isr = [3,1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 62, leader = 3, replicas = [3,2,1], isr = [3,2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 95, leader = 4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 0, leader = 4, replicas = [4,1,2], isr = [4,1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 25, leader
[jira] [Commented] (KAFKA-16360) Release plan of 3.x kafka releases.
[ https://issues.apache.org/jira/browse/KAFKA-16360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825386#comment-17825386 ] Justine Olshan commented on KAFKA-16360: Hey there – there was some discussion on the mailing list. 3.8 should be the last release. See here: [https://lists.apache.org/thread/kvdp2gmq5gd9txkvxh5vk3z2n55b04s5] There is also a KIP. KIP-1012: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1012%3A+The+need+for+a+Kafka+3.8.x+release] Hope this clears things up. 3.8 should be the last release. > Release plan of 3.x kafka releases. > --- > > Key: KAFKA-16360 > URL: https://issues.apache.org/jira/browse/KAFKA-16360 > Project: Kafka > Issue Type: Improvement >Reporter: kaushik srinivas >Priority: Major > > KIP > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-833%3A+Mark+KRaft+as+Production+Ready#KIP833:MarkKRaftasProductionReady-ReleaseTimeline] > mentions , > h2. Kafka 3.7 > * January 2024 > * Final release with ZK mode > But we see in Jira, some tickets are marked for 3.8 release. Does apache > continue to make 3.x releases having zookeeper and kraft supported > independent of pure kraft 4.x releases ? > If yes, how many more releases can be expected on 3.x release line ? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16360) Release plan of 3.x kafka releases.
[ https://issues.apache.org/jira/browse/KAFKA-16360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825385#comment-17825385 ] Greg Harris commented on KAFKA-16360: - Hi [~kaushik srinivas], thanks for your question! The release schedule in that KIP was superseded by a later KIP: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1012%3A+The+need+for+a+Kafka+3.8.x+release] and so is not accurate any longer. At this time, we expect that 3.8 will be the last 3.x release, but that could change depending on which features are ready in that release. > Release plan of 3.x kafka releases. > --- > > Key: KAFKA-16360 > URL: https://issues.apache.org/jira/browse/KAFKA-16360 > Project: Kafka > Issue Type: Improvement >Reporter: kaushik srinivas >Priority: Major > > KIP > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-833%3A+Mark+KRaft+as+Production+Ready#KIP833:MarkKRaftasProductionReady-ReleaseTimeline] > mentions , > h2. Kafka 3.7 > * January 2024 > * Final release with ZK mode > But we see in Jira, some tickets are marked for 3.8 release. Does apache > continue to make 3.x releases having zookeeper and kraft supported > independent of pure kraft 4.x releases ? > If yes, how many more releases can be expected on 3.x release line ? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16360) Release plan of 3.x kafka releases.
kaushik srinivas created KAFKA-16360: Summary: Release plan of 3.x kafka releases. Key: KAFKA-16360 URL: https://issues.apache.org/jira/browse/KAFKA-16360 Project: Kafka Issue Type: Improvement Reporter: kaushik srinivas KIP [https://cwiki.apache.org/confluence/display/KAFKA/KIP-833%3A+Mark+KRaft+as+Production+Ready#KIP833:MarkKRaftasProductionReady-ReleaseTimeline] mentions , h2. Kafka 3.7 * January 2024 * Final release with ZK mode But we see in Jira, some tickets are marked for 3.8 release. Does apache continue to make 3.x releases having zookeeper and kraft supported independent of pure kraft 4.x releases ? If yes, how many more releases can be expected on 3.x release line ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective
[ https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825373#comment-17825373 ] Gaurav Narula commented on KAFKA-16359: --- [~apoorvmittal10] I stumbled upon [https://github.com/johnrengelman/shadow/issues/324] and figured it might be useful when you take this up. > kafka-clients-3.7.0.jar published to Maven Central is defective > --- > > Key: KAFKA-16359 > URL: https://issues.apache.org/jira/browse/KAFKA-16359 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Jeremy Norris >Assignee: Apoorv Mittal >Priority: Critical > > The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is > defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} > element: > {code} > Manifest-Version: 1.0 > Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10 > .5.jar slf4j-api-1.7.36.jar > {code} > This bogus {{Class-Path}} element leads to compiler warnings for projects > that utilize it as a dependency: > {code} > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar": > no such file or directory > {code} > Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published > without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or > a new release should be published that corrects this defect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16227: Avoid IllegalStateException during fetch initialization [kafka]
cadonna commented on code in PR #15491: URL: https://github.com/apache/kafka/pull/15491#discussion_r1519991935 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java: ## @@ -278,32 +278,48 @@ private CompletedFetch handleInitializeSuccess(final CompletedFetch completedFet } } -if (partition.highWatermark() >= 0) { -log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark()); -subscriptions.updateHighWatermark(tp, partition.highWatermark()); +if (!updatePartitionState(partition, tp)) { +return null; +} + +completedFetch.setInitialized(); +return completedFetch; +} + +private boolean updatePartitionState(final FetchResponseData.PartitionData partitionData, + final TopicPartition tp) { +if (partitionData.highWatermark() >= 0) { +log.trace("Updating high watermark for partition {} to {}", tp, partitionData.highWatermark()); +if (!subscriptions.tryUpdatingHighWatermark(tp, partitionData.highWatermark())) { Review Comment: Yep! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16227: Avoid IllegalStateException during fetch initialization [kafka]
cadonna commented on PR #15491: URL: https://github.com/apache/kafka/pull/15491#issuecomment-1988816400 > LGTM! I think it's a good solution in the current architecture, though arguably a bit smelly. Agree on the arguable smell. The alternative would be to use locks which I disliked because locks are not needed for the legacy consumer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16354) FinalizedFeatureChangeListenerTest should use mocked latches
[ https://issues.apache.org/jira/browse/KAFKA-16354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825365#comment-17825365 ] PoAn Yang commented on KAFKA-16354: --- Hi [~gharris1727], I am interested in this. May I assign to myself? Thank you. > FinalizedFeatureChangeListenerTest should use mocked latches > > > Key: KAFKA-16354 > URL: https://issues.apache.org/jira/browse/KAFKA-16354 > Project: Kafka > Issue Type: Test > Components: core >Affects Versions: 3.7.0 >Reporter: Greg Harris >Priority: Trivial > Labels: newbie > > testCacheUpdateWaitFailsForUnreachableVersion takes 30 seconds, and > testInitFailureDueToFeatureIncompatibility takes 5 seconds. This appears to > be caused by FeatureCacheUpdater#awaitUpdateOrThrow waiting for a real > CountDownLatch with a real-time timeout. > Instead, a mocked latch should be used to exit the await immediately. > This should be done both for CPU-independence, and for test execution speed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-12187: replace assertTrue(obj instanceof X) with assertInstanceOf [kafka]
brandboat opened a new pull request, #15512: URL: https://github.com/apache/kafka/pull/15512 as title, replace - `assertTrue(obj instanceof X)` with `assertInstanceOf(X.class, obj)` - `assertTrue(obj instanceof X, errormessge)` with `assertInstanceOf(X.class, obj, errormessage)` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on PR #15511: URL: https://github.com/apache/kafka/pull/15511#issuecomment-1988762078 @lianetm @dajac Could you please have a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16114) Fix partiton not retention after cancel alter intra broker log dir task
[ https://issues.apache.org/jira/browse/KAFKA-16114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825363#comment-17825363 ] Divij Vaidya commented on KAFKA-16114: -- Sorry [~albedooo] , I won't have bandwidth any time soon to look into this. > Fix partiton not retention after cancel alter intra broker log dir task > > > Key: KAFKA-16114 > URL: https://issues.apache.org/jira/browse/KAFKA-16114 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 3.3.2, 3.6.1 >Reporter: wangliucheng >Priority: Major > > The deletion thread will not work on partition after cancel alter intra > broker log dir task > The steps to reproduce are as follows: > 1、Create reassignment.json file > test01-1 on the /data01/kafka/log01 directory of the broker 1003,then move to > /data01/kafka/log02 > {code:java} > { > "version": 1, > "partitions": [ > { > "topic": "test01", > "partition": 1, > "replicas": [1001,1003], > "log_dirs": ["any","/data01/kafka/log02"] > } > ] > }{code} > 2、Kick off the reassignment > {code:java} > bin/kafka-reassign-partitions.sh -bootstrap-server localhost:9092 > --reassignment-json-file reassignment.json -execute {code} > 3、Cancel the reassignment > {code:java} > bin/kafka-reassign-partitions.sh -bootstrap-server localhost:9092 > --reassignment-json-file reassignment.json -cancel {code} > 4、Result, The partition test01-1 on 1003 will not be deleted > The reason for this problem is the partition has been filtered: > {code:java} > val deletableLogs = logs.filter { > case (_, log) => !log.config.compact // pick non-compacted logs > }.filterNot { > case (topicPartition, _) => inProgress.contains(topicPartition) // skip any > logs already in-progress > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru opened a new pull request, #15511: URL: https://github.com/apache/kafka/pull/15511 The goal of this PR is to change the following internals of the reconciliation: - Introduce a "local epoch" to the local target assignment. When a new target is received by the server, we compare it with the current value. If it is the same, no change. Otherwise, we bump the local epoch and store the new target assignment. Then, on the reconciliation, we also store the epoch in the reconciled assignment and keep using target != current to trigger the reconciliation. - When we are not in a group (we have not received an assignment), we use null to represent the local target assignment instead of an empty list, to avoid confusions with an empty assignment received by the server. Similarly, we use null to represent the current assignment, when we haven't reconciled the assignment yet. - We also carry the new epoch into the request builder to ensure that we report the owned partitions for the last local epoch. - To address [KAFKA-16312](https://issues.apache.org/jira/browse/KAFKA-16312) (call onPartitionsAssigned on empty assignments after joining), we apply the initial assignment returned by the group coordinator (whether empty or not) as a normal reconciliation. This avoids introducing another code path to trigger rebalance listeners - reconciliation is the only way to transition to STABLE. The unneeded parts of reconciliation (autocommit, revocation) will be skipped in the existing. Since a lot of unit tests assumed that not reconciliation behavior is invoked when joining the group with an empty assignment, this required a lot of the changes in the unit tests. ## Testing These changes allow the new consumer to pass the first 10 system tests. We piggy-back a minor change to the `HeartbeatManager` that are required for those system tests as well: always send `rebalanceTimoutMs`, `subscriptions` when (re-)joining. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org