Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
sjhajharia commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1522533304 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -42,6 +42,11 @@ object AddPartitionsToTxnManager { val VerificationTimeMsMetricName = "VerificationTimeMs" } +object OperationExpected extends Enumeration { + type operation = Value + val defaultOperation, latestProduceVersion, addPartitionWithoutVerification = Value Review Comment: ack updated the code to use `sealed trait` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]
showuon commented on code in PR #15476: URL: https://github.com/apache/kafka/pull/15476#discussion_r1522451780 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -293,11 +293,11 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; -offsetOfMaxTimestamp = initialOffset; -} - -if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { -offsetOfMaxTimestamp = offsetCounter.value - 1; +if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { +offsetOfMaxTimestamp = offsetCounter.value - 1; Review Comment: > I prefer the consistent behavior. Hence, it seems to me that adding document to OffsetSpec.MaxTimestampSpec is enough here. The use cases which depends on previous behavior (last offset) should be re-written for this bug fix ( and behavior change). +1 from me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DO NOT MERGE] KAFKA-14419: limit time spent processing during ongoing rebalance and delay followup rebalance trigger [kafka]
github-actions[bot] commented on PR #15009: URL: https://github.com/apache/kafka/pull/15009#issuecomment-1993330968 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1522447288 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -242,34 +242,23 @@ public MemoryRecords build() { /** * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. + * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. * - * If the log append time is used, the offset will be the last offset unless no compression is used and - * the message format version is 0 or 1, in which case, it will be the first offset. + * If the log append time is used, the offset will be the first offset of the record. * - * If create time is used, the offset will be the last offset unless no compression is used and the message - * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp. + * If create time is used, the offset will always be the offset of the record with the max timestamp. + * + * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. * * @return The max timestamp and its offset */ public RecordsInfo info() { if (timestampType == TimestampType.LOG_APPEND_TIME) { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = baseOffset; -return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp); -} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { -return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); +return new RecordsInfo(logAppendTime, baseOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping +// If it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1] +return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); Review Comment: Currently, when in `assignOffsetsNonCompressed` or `validateMessagesAndAssignOffsetsCompressed`, we'll always return `[-1, -1]` for maxTimestamp and offset if it's `MAGIC_VALUE_V0` [here](https://github.com/apache/kafka/pull/15474/files#diff-74aba980d40ab3c5c6fd66d7a27ffb4515181130e475b589834538d05aa408b9L263-L264). But in the case of re-build the records, we make it as `[-1, lastOffset]`, which is inconsistent. Fixing it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1522447288 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -242,34 +242,23 @@ public MemoryRecords build() { /** * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. + * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. * - * If the log append time is used, the offset will be the last offset unless no compression is used and - * the message format version is 0 or 1, in which case, it will be the first offset. + * If the log append time is used, the offset will be the first offset of the record. * - * If create time is used, the offset will be the last offset unless no compression is used and the message - * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp. + * If create time is used, the offset will always be the offset of the record with the max timestamp. + * + * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. * * @return The max timestamp and its offset */ public RecordsInfo info() { if (timestampType == TimestampType.LOG_APPEND_TIME) { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = baseOffset; -return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp); -} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { -return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); +return new RecordsInfo(logAppendTime, baseOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping +// If it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1] +return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); Review Comment: Currently, when in `assignOffsetsNonCompressed` or `validateMessagesAndAssignOffsetsCompressed`, we'll always return `[-1, -1]` for maxTimestamp and offset if it's `MAGIC_VALUE_V0` [here](https://github.com/apache/kafka/pull/15474/files#diff-74aba980d40ab3c5c6fd66d7a27ffb4515181130e475b589834538d05aa408b9L263-L264). But in the case of re-build the records, we make the it as `[-1, lastOffset]`, which is inconsistent. Fixing it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1522447288 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -242,34 +242,23 @@ public MemoryRecords build() { /** * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. + * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. * - * If the log append time is used, the offset will be the last offset unless no compression is used and - * the message format version is 0 or 1, in which case, it will be the first offset. + * If the log append time is used, the offset will be the first offset of the record. * - * If create time is used, the offset will be the last offset unless no compression is used and the message - * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp. + * If create time is used, the offset will always be the offset of the record with the max timestamp. + * + * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. * * @return The max timestamp and its offset */ public RecordsInfo info() { if (timestampType == TimestampType.LOG_APPEND_TIME) { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = baseOffset; -return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp); -} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { -return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); +return new RecordsInfo(logAppendTime, baseOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping +// If it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1] +return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); Review Comment: Currently, when in `assignOffsetsNonCompressed` or `validateMessagesAndAssignOffsetsCompressed`, we'll always return `[-1, -1]` for maxTimestamp and offset if it's `MAGIC_VALUE_V0`. But in the case of re-build the records, we make the it as `[-1, lastOffset]`, which is inconsistent. Fixing it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
Phuc-Hong-Tran commented on PR #15188: URL: https://github.com/apache/kafka/pull/15188#issuecomment-1993206506 Regex validity check will be included in the next pull request, I'll try to get it done by this weekend. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15716: KRaft support in EpochDrivenReplicationProtocolAcceptanceTest [kafka]
highluck commented on code in PR #15295: URL: https://github.com/apache/kafka/pull/15295#discussion_r1522384690 ## core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala: ## @@ -53,17 +58,26 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit val topic = "topic1" val msg = new Array[Byte](1000) val msgBigger = new Array[Byte](1) - var brokers: Seq[KafkaServer] = _ + var brokers: Seq[KafkaBroker] = _ var producer: KafkaProducer[Array[Byte], Array[Byte]] = _ var consumer: Consumer[Array[Byte], Array[Byte]] = _ + private def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT + private def listenerName: ListenerName = ListenerName.forSecurityProtocol(securityProtocol) + @BeforeEach override def setUp(testInfo: TestInfo): Unit = { +if (TestInfoUtils.isKRaft(testInfo) && metadataVersion.isLessThan(IBP_3_3_IV0)) { Review Comment: Oh thank you! It seems unnecessary. I'll edit it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15716: KRaft support in EpochDrivenReplicationProtocolAcceptanceTest [kafka]
highluck commented on PR #15295: URL: https://github.com/apache/kafka/pull/15295#issuecomment-1993132575 @mimaison thanks for review I felt like I needed to think a little more, so I thought it would be a good idea to work on a follow-up PR. This is because the test does not work with the broker structure currently in use. When the current work is finished, I will do a follow-up work! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1522334545 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1192,99 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. -if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { +if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } +/** + * Reconciles the current assignment of the member if needed. + * + * @param groupId The group id. + * @param memberThe member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. + * @return The received member if no changes have been made; or a new + * member containing the new assignment. + */ +private ConsumerGroupMember maybeReconcile( +String groupId, +ConsumerGroupMember member, +BiFunction currentPartitionEpoch, +int targetAssignmentEpoch, +Assignment targetAssignment, +List ownedTopicPartitions, +List records +) { +if (member.isReconciledTo(targetAssignmentEpoch)) { +return member; +} + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(targetAssignmentEpoch, targetAssignment) +.withCurrentPartitionEpoch(currentPartitionEpoch) +.withOwnedTopicPartitions(ownedTopicPartitions) +.build(); + +if (!updatedMember.equals(member)) { +records.add(newCurrentAssignmentRecord(groupId, updatedMember)); + +log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, " + + "assignedPartitions={} and revokedPartitions={}.", +groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), +formatAssignment(updatedMember.assignedPartitions()), formatAssignment(updatedMember.revokedPartitions())); + +if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { +scheduleConsumerGroupRebalanceTimeout( +groupId, +updatedMember.memberId(), +updatedMember.memberEpoch(), +updatedMember.rebalanceTimeoutMs() +); +} else { Review Comment: I see. I think it is a little confusing for people trying to understand the algorithm, but I agree that it is important to have it apply and be used in a way that makes sense. So just so I get it straight -- we have a timeout only for the revocation of the partitions. If we hit the timeout, the member is fenced and I assume we can count those partitions as revoked. (Or we no longer have to wait for them to be revoked) At the point all partitions are revoked, we can try to assign new partitions to members. In this case, are we relying on the heartbeat to kick out members out if they aren't responding? And we expect if requests are going through the assignment will occur? Just trying to confirm the reason for the separate timeout here. Is it because revoking is more likely to fail even though the heartbeat still goes through, but not the same for assigning? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1522334545 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1192,99 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. -if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { +if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } +/** + * Reconciles the current assignment of the member if needed. + * + * @param groupId The group id. + * @param memberThe member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. + * @return The received member if no changes have been made; or a new + * member containing the new assignment. + */ +private ConsumerGroupMember maybeReconcile( +String groupId, +ConsumerGroupMember member, +BiFunction currentPartitionEpoch, +int targetAssignmentEpoch, +Assignment targetAssignment, +List ownedTopicPartitions, +List records +) { +if (member.isReconciledTo(targetAssignmentEpoch)) { +return member; +} + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(targetAssignmentEpoch, targetAssignment) +.withCurrentPartitionEpoch(currentPartitionEpoch) +.withOwnedTopicPartitions(ownedTopicPartitions) +.build(); + +if (!updatedMember.equals(member)) { +records.add(newCurrentAssignmentRecord(groupId, updatedMember)); + +log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, " + + "assignedPartitions={} and revokedPartitions={}.", +groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), +formatAssignment(updatedMember.assignedPartitions()), formatAssignment(updatedMember.revokedPartitions())); + +if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { +scheduleConsumerGroupRebalanceTimeout( +groupId, +updatedMember.memberId(), +updatedMember.memberEpoch(), +updatedMember.rebalanceTimeoutMs() +); +} else { Review Comment: I see. I think it is a little confusing for people trying to understand the algorithm, but I agree that it is important to have it apply and be used in a way that makes sense. So just so I get it straight -- we have a timeout only for the revocation of the partitions. If we hit the timeout, the member is fenced and I assume we can count those partitions as revoked. (Or we no longer have to wait for them to be revoked) At the point all partitions are revoked, we can try to assign new partitions to members. In this case, are we relying on the heartbeat to kick out members if they don't ack the new assignments? Just trying to confirm the reason for the separate timeout here. Is it because revoking is more likely to fail even though the heartbeat still goes through, but not the same for assigning? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16354) FinalizedFeatureChangeListenerTest should use mocked latches
[ https://issues.apache.org/jira/browse/KAFKA-16354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825871#comment-17825871 ] PoAn Yang commented on KAFKA-16354: --- Hi [~gharris1727], it looks like not both `testCacheUpdateWaitFailsForUnreachableVersion` and `testInitFailureDueToFeatureIncompatibility` wait in CountDownLatch#await function. `testCacheUpdateWaitFailsForUnreachableVersion` waits in ZkMetadataCache#waitUntilFeatureEpochOrThrow. It gets System.nanoTime directly. We can't use a mocked latch for it. `testInitFailureDueToFeatureIncompatibility`waits with CountDownLatch. However, we may need to update FinalizedFeatureChangeListener#initOrThrow input to give a mocked latch. Do we want to do that? Thank you. > FinalizedFeatureChangeListenerTest should use mocked latches > > > Key: KAFKA-16354 > URL: https://issues.apache.org/jira/browse/KAFKA-16354 > Project: Kafka > Issue Type: Test > Components: core >Affects Versions: 3.7.0 >Reporter: Greg Harris >Priority: Trivial > Labels: newbie > > testCacheUpdateWaitFailsForUnreachableVersion takes 30 seconds, and > testInitFailureDueToFeatureIncompatibility takes 5 seconds. This appears to > be caused by FeatureCacheUpdater#awaitUpdateOrThrow waiting for a real > CountDownLatch with a real-time timeout. > Instead, a mocked latch should be used to exit the await immediately. > This should be done both for CPU-independence, and for test execution speed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]
chia7712 commented on code in PR #15476: URL: https://github.com/apache/kafka/pull/15476#discussion_r1522318718 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -293,11 +293,11 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; -offsetOfMaxTimestamp = initialOffset; -} - -if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { -offsetOfMaxTimestamp = offsetCounter.value - 1; +if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { +offsetOfMaxTimestamp = offsetCounter.value - 1; Review Comment: It seems KIP-734 does not define the case we are discussing - return the initial/last offset when there are multi-records having same "max timestamp". So how we deal with it? Is it a good practice to discuss compatibility? Or we deal with it as "bug fix" and we add enough document to explain the "correct behavior" ? I prefer the consistent behavior. Hence, it seems to me that adding document to `OffsetSpec.MaxTimestampSpec` is enough here. The use cases which depends on previous behavior (last offset) should be re-written for this bug fix ( and behavior change). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1522316891 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/MemberState.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import java.util.HashMap; +import java.util.Map; + +/** + * The various states that a member can be in. For their definition, + * refer to the documentation of {{@link CurrentAssignmentBuilder}}. Review Comment: Ok -- really minor nit, but I guess it is less so in the documentation of the file and more in the comments of the file. Not sure if we care to make that distinction, so I will leave it to you to decide. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Make string from array [kafka]
jolshan commented on code in PR #15526: URL: https://github.com/apache/kafka/pull/15526#discussion_r1522315310 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1564,7 +1564,7 @@ object LogManager { Option(newTopicsImage.getPartition(topicId, partitionId)) match { case Some(partition) => if (!partition.replicas.contains(brokerId)) { -info(s"Found stray log dir $log: the current replica assignment ${partition.replicas} " + +info(s"Found stray log dir $log: the current replica assignment ${partition.replicas.mkString("[", ", ", "]")} " + Review Comment: Ok. I guess our suggestions are the same 路♀️ . It was a bit less readable to me which is why I was confused. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: simplify consumer logic [kafka]
mjsax commented on PR #15519: URL: https://github.com/apache/kafka/pull/15519#issuecomment-1992854817 PR update does not show up right now. Github Status page: > Update - We're continuing to investigate an elevated number of pull requests that are out of sync on page load. Mar 13, 2024 - 00:12 UTC Just in case you are wondering. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Make string from array [kafka]
jsancio commented on code in PR #15526: URL: https://github.com/apache/kafka/pull/15526#discussion_r1522311967 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1564,7 +1564,7 @@ object LogManager { Option(newTopicsImage.getPartition(topicId, partitionId)) match { case Some(partition) => if (!partition.replicas.contains(brokerId)) { -info(s"Found stray log dir $log: the current replica assignment ${partition.replicas} " + +info(s"Found stray log dir $log: the current replica assignment ${partition.replicas.mkString("[", ", ", "]")} " + Review Comment: Why? the `"["` is the starting string, `", "` is the separator and `"]"` is the ending string. `Array(1,2).mkString("[", ", ", "]")` returns `"[1, 2]"`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: simplify consumer logic [kafka]
mjsax commented on PR #15519: URL: https://github.com/apache/kafka/pull/15519#issuecomment-1992841168 Thank for all the input! -- Updated the PR accordinly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
mjsax commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1522304963 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: > Will try to make a unit-test for this. Thanks! That's awesome! > The documentation says that the ordering of the KeyValueIterator is NOT guaranteed. But this is internal, right? This helper store is not plugable, so I think we could rely on ordering? -- We do store the data (ie, key) as ``, so we should be able to break the loop when we get the first timestamp which is too large (independent if it's a left or right record). cf `TimestampedKeyAndJoinSideSerializer.java` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
mjsax commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1522304963 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: > Will try to make a unit-test for this. Thanks! That's awesome! > The documentation says that the ordering of the KeyValueIterator is NOT guaranteed. But this is internal, right? This helper store is not plugable, so I think we could rely on ordering? -- We do store the data (ie, key) as ``, so we should be able to break the loop when we get the first timestamp which is too large (independent if it's a left or right record). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Make string from array [kafka]
jolshan commented on code in PR #15526: URL: https://github.com/apache/kafka/pull/15526#discussion_r1522294411 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1564,7 +1564,7 @@ object LogManager { Option(newTopicsImage.getPartition(topicId, partitionId)) match { case Some(partition) => if (!partition.replicas.contains(brokerId)) { -info(s"Found stray log dir $log: the current replica assignment ${partition.replicas} " + +info(s"Found stray log dir $log: the current replica assignment ${partition.replicas.mkString("[", ", ", "]")} " + Review Comment: Does it make sense to be `[${partition.replicas.mkString(",")}]` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1522294617 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -19,82 +19,230 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.utils.Utils import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.Properties import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" + val topicNameWithCustomConfigs = "foo2" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) -createTopic(topicName, 1, 1.toShort) -produceMessages() +createTopicWithConfig(topicName, new Properties()) adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { +produceMessagesInOneBatch("gzip") +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this one batch test, it'll be the first offset 0 +verifyListOffsets(topic = topicNameWithCustomConfigs, 0) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testLatestOffset(quorum: String): Unit = { -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) -assertEquals(3, latestOffset.offset()) + def testThreeRecordsInSeparateBatch(quorum: String): Unit = { +produceMessagesInSeparateBatch() +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this separate batch test, it'll be the last offset 2 +verifyListOffsets(topic = topicNameWithCustomConfigs, 2) Review Comment: Good point! Let me improve it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Make string from array [kafka]
jolshan commented on code in PR #15526: URL: https://github.com/apache/kafka/pull/15526#discussion_r1522294411 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1564,7 +1564,7 @@ object LogManager { Option(newTopicsImage.getPartition(topicId, partitionId)) match { case Some(partition) => if (!partition.replicas.contains(brokerId)) { -info(s"Found stray log dir $log: the current replica assignment ${partition.replicas} " + +info(s"Found stray log dir $log: the current replica assignment ${partition.replicas.mkString("[", ", ", "]")} " + Review Comment: Does it make sense to be [${{partition.replicas.mkString(",")}] ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1564,7 +1564,7 @@ object LogManager { Option(newTopicsImage.getPartition(topicId, partitionId)) match { case Some(partition) => if (!partition.replicas.contains(brokerId)) { -info(s"Found stray log dir $log: the current replica assignment ${partition.replicas} " + +info(s"Found stray log dir $log: the current replica assignment ${partition.replicas.mkString("[", ", ", "]")} " + Review Comment: Does it make sense to be `[${{partition.replicas.mkString(",")}]` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Make string from array [kafka]
jolshan commented on code in PR #15526: URL: https://github.com/apache/kafka/pull/15526#discussion_r1522293491 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1564,7 +1564,7 @@ object LogManager { Option(newTopicsImage.getPartition(topicId, partitionId)) match { case Some(partition) => if (!partition.replicas.contains(brokerId)) { -info(s"Found stray log dir $log: the current replica assignment ${partition.replicas} " + +info(s"Found stray log dir $log: the current replica assignment ${partition.replicas.mkString("[", ", ", "]")} " + Review Comment: are we assuming 3 replicas here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR; Make string from array [kafka]
jsancio opened a new pull request, #15526: URL: https://github.com/apache/kafka/pull/15526 If toString is called on an array it returns the string representing the object reference. Use mkString instead to print the content of the array. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]
hgeraldino commented on PR #15506: URL: https://github.com/apache/kafka/pull/15506#issuecomment-1992692291 Thanks @OmniaGM for your review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]
junrao commented on code in PR #15476: URL: https://github.com/apache/kafka/pull/15476#discussion_r1522196043 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -293,11 +293,11 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; -offsetOfMaxTimestamp = initialOffset; -} - -if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { -offsetOfMaxTimestamp = offsetCounter.value - 1; +if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { +offsetOfMaxTimestamp = offsetCounter.value - 1; Review Comment: @johnnychhsu : Yes, that was the original code. But the issue is that that code was no longer correct when [KIP-734](https://cwiki.apache.org/confluence/display/KAFKA/KIP-734%3A+Improve+AdminClient.listOffsets+to+return+timestamp+and+offset+for+the+record+with+the+largest+timestamp) was added. Without KIP-734, it's ok to set offsetOfMaxTimestamp to the last offset for magic V2. However, with KIP-734, it's important to set offsetOfMaxTimestamp to the first offset for both magic V1 and V2. It seems that we missed that part when adding KIP-734. So, I think the logic should be if `timestampType == TimestampType.LOG_APPEND_TIME`, we always set `offsetOfMaxTimestamp` to `initialOffset`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]
hgeraldino commented on code in PR #15506: URL: https://github.com/apache/kafka/pull/15506#discussion_r1522194636 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java: ## @@ -1215,17 +1195,13 @@ public void testSuppressCloseErrors() { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); -try { -workerTask.execute(); -fail("workerTask.execute should have thrown an exception"); -} catch (ConnectException e) { -assertSame("Exception from put should be the cause", putException, e.getCause()); -assertTrue("Exception from close should be suppressed", e.getSuppressed().length > 0); -assertSame(closeException, e.getSuppressed()[0]); -} + +Throwable thrownException = assertThrows(ConnectException.class, () -> workerTask.execute()); +assertSame("Exception from put should be the cause", putException, thrownException.getCause()); Review Comment: This is a remnant of the original implementation. As you correctly guessed, there is no difference in this case between using `assertEquals` and `assertSame`: neither the `RuntimeException` class nor any of its parents override the `equals()` method, so they both are effectively checking for reference equality (as implemented by the base `Object` class) I'm happy to change these to `assertEquals` (this is the only place where this is used) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Tweak streams config doc [kafka]
mjsax merged PR #15518: URL: https://github.com/apache/kafka/pull/15518 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1522184075 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.test.MockApiProcessor; +import org.apache.kafka.test.MockApiProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; + +public class KStreamKStreamWindowCloseTest { + +private static final String LEFT = "left"; +private static final String RIGHT = "right"; +private final static Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private static final Consumed CONSUMED = Consumed.with(Serdes.Integer(), Serdes.String()); +private static final JoinWindows WINDOW = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)); + +static List streams() { +return Arrays.asList( +innerJoin(), +leftJoin(), +outerJoin() +); +} + +private static Arguments innerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).join( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments leftJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).leftJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments outerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).outerJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream.process(supplier); +return Arguments.of(builder.build(PROPS), supplier); +} + +@ParameterizedTest +@MethodSource("streams") +public void recordsArrivingPostWindowCloseShouldBeDropped( +final Topology topology, +final MockApiProcessorSupplier
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1522184075 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.test.MockApiProcessor; +import org.apache.kafka.test.MockApiProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; + +public class KStreamKStreamWindowCloseTest { + +private static final String LEFT = "left"; +private static final String RIGHT = "right"; +private final static Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private static final Consumed CONSUMED = Consumed.with(Serdes.Integer(), Serdes.String()); +private static final JoinWindows WINDOW = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)); + +static List streams() { +return Arrays.asList( +innerJoin(), +leftJoin(), +outerJoin() +); +} + +private static Arguments innerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).join( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments leftJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).leftJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); +stream.process(processorSupplier); +return Arguments.of(builder.build(PROPS), processorSupplier); +} + +private static Arguments outerJoin() { +final StreamsBuilder builder = new StreamsBuilder(); +final KStream stream = builder.stream(LEFT, CONSUMED).outerJoin( +builder.stream(RIGHT, CONSUMED), +MockValueJoiner.TOSTRING_JOINER, +WINDOW, +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) +); +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream.process(supplier); +return Arguments.of(builder.build(PROPS), supplier); +} + +@ParameterizedTest +@MethodSource("streams") +public void recordsArrivingPostWindowCloseShouldBeDropped( +final Topology topology, +final MockApiProcessorSupplier
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
junrao commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1522164475 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -19,82 +19,230 @@ package kafka.admin import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig +import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.utils.Utils import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.Properties import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { val topicName = "foo" + val topicNameWithCustomConfigs = "foo2" var adminClient: Admin = _ + var setOldMessageFormat: Boolean = false @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) -createTopic(topicName, 1, 1.toShort) -produceMessages() +createTopicWithConfig(topicName, new Properties()) adminClient = Admin.create(Map[String, Object]( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers() ).asJava) } @AfterEach override def tearDown(): Unit = { +setOldMessageFormat = false Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testEarliestOffset(quorum: String): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) -assertEquals(0, earliestOffset.offset()) + def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = { +produceMessagesInOneBatch("gzip") +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this one batch test, it'll be the first offset 0 +verifyListOffsets(topic = topicNameWithCustomConfigs, 0) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testLatestOffset(quorum: String): Unit = { -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) -assertEquals(3, latestOffset.offset()) + def testThreeRecordsInSeparateBatch(quorum: String): Unit = { +produceMessagesInSeparateBatch() +verifyListOffsets() + +// test LogAppendTime case +val props: Properties = new Properties() +props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") +createTopicWithConfig(topicNameWithCustomConfigs, props) +produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs) +// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch. +// So in this separate batch test, it'll be the last offset 2 +verifyListOffsets(topic = topicNameWithCustomConfigs, 2) Review Comment: Do we guarantee that the server time has advanced after appending each batch? Ditto for `testThreeRecordsInSeparateBatchWithMessageConversion` ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +53,24 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before() { cluster.config().serverProperties().put("auto.create.topics.enable", false); cluster.config().serverProperties().put("offsets.topic.replication.factor", "1"); cluster.config().serverProperties().put("offsets.topic.num.partitions", String.valueOf(offsetTopicPartitionCount)); +} +public void setUp() { Review Comment: Ok. Could we make this setUp() private? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the
Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]
nizhikov commented on PR #14471: URL: https://github.com/apache/kafka/pull/14471#issuecomment-1992664069 @chia7712 I ran system tests for command, also. System tests passed. ``` SESSION REPORT (ALL TESTS) ducktape version: 0.11.4 session_id: 2024-03-12--004 run time: 5 minutes 22.467 seconds tests run:12 passed: 12 flaky:0 failed: 0 ignored: 0 test_id: kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_describe_consumer_group.security_protocol=PLAINTEXT.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False status: PASS run time: 26.101 seconds test_id: kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_describe_consumer_group.security_protocol=PLAINTEXT.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True status: PASS run time: 25.893 seconds test_id: kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_describe_consumer_group.security_protocol=PLAINTEXT.metadata_quorum=ZK.use_new_coordinator=False status: PASS run time: 21.294 seconds test_id: kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_describe_consumer_group.security_protocol=SSL.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False status: PASS run time: 31.293 seconds test_id: kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_describe_consumer_group.security_protocol=SSL.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True status: PASS run time: 31.091 seconds test_id: kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_describe_consumer_group.security_protocol=SSL.metadata_quorum=ZK.use_new_coordinator=False status: PASS run time: 25.902 seconds test_id: kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_list_consumer_groups.security_protocol=PLAINTEXT.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False status: PASS run time: 25.983 seconds test_id: kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_list_consumer_groups.security_protocol=PLAINTEXT.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True status: PASS run time: 26.154 seconds test_id: kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_list_consumer_groups.security_protocol=PLAINTEXT.metadata_quorum=ZK.use_new_coordinator=False status: PASS run time: 20.705 seconds test_id: kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_list_consumer_groups.security_protocol=SSL.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False status: PASS run time: 30.682 seconds test_id: kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_list_consumer_groups.security_protocol=SSL.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True status: PASS run time: 30.673 seconds test_id: kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_list_consumer_groups.security_protocol=SSL.metadata_quorum=ZK.use_new_coordinator=False status: PASS run time: 26.424 seconds nizhikov@v10792:~/kafka$ git status On branch KAFKA-14589 Your branch is up to date with 'origin/KAFKA-14589'. nothing to commit, working tree clean nizhikov@v10792:~/kafka$ git remote -v origin https://github.com/nizhikov/kafka.git (fetch) origin https://github.com/nizhikov/kafka.git (push) ``` -- This is an automated message from the Apache Git Service. To
Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]
nizhikov commented on code in PR #14471: URL: https://github.com/apache/kafka/pull/14471#discussion_r1522059985 ## build.gradle: ## @@ -1999,10 +1999,14 @@ project(':tools') { implementation project(':clients') implementation project(':storage') implementation project(':server-common') +implementation project(':server') Review Comment: Great catch. Thanks! It seems this dependency not required. Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective
[ https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825794#comment-17825794 ] Ismael Juma commented on KAFKA-16359: - [~norrisjeremy] it's clearly a bug - there is no debate about that. But it's good to confirm the impact and whether there is a workaround. That will influence the urgency for the new release (i.e. does it have to be immediate or can it wait a week or two so that other important issues are fixed as well). Thanks for confirming the specifics. > kafka-clients-3.7.0.jar published to Maven Central is defective > --- > > Key: KAFKA-16359 > URL: https://issues.apache.org/jira/browse/KAFKA-16359 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Jeremy Norris >Assignee: Apoorv Mittal >Priority: Critical > > The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is > defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} > element: > {code} > Manifest-Version: 1.0 > Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10 > .5.jar slf4j-api-1.7.36.jar > {code} > This bogus {{Class-Path}} element leads to compiler warnings for projects > that utilize it as a dependency: > {code} > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar": > no such file or directory > {code} > Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published > without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or > a new release should be published that corrects this defect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1522051894 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -243,33 +243,20 @@ public MemoryRecords build() { /** * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. * - * If the log append time is used, the offset will be the last offset unless no compression is used and Review Comment: Could you please update `OffsetSpec` also? https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java#L65 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Ctr end offsets shoud not throw [kafka]
philipnee opened a new pull request, #15525: URL: https://github.com/apache/kafka/pull/15525 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective
[ https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825772#comment-17825772 ] Apoorv Mittal commented on KAFKA-16359: --- Though by the time fix and release happens, below workaround might suppress the warning for now (not a good practice but a workaround for now): {code:java} org.apache.maven.plugins maven-compiler-plugin true true true ... ... -Xlint:-path ... {code} Similar shadow jar issue encountered by launchdarkly and others, reference https://github.com/launchdarkly/java-server-sdk/issues/240 > kafka-clients-3.7.0.jar published to Maven Central is defective > --- > > Key: KAFKA-16359 > URL: https://issues.apache.org/jira/browse/KAFKA-16359 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Jeremy Norris >Assignee: Apoorv Mittal >Priority: Critical > > The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is > defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} > element: > {code} > Manifest-Version: 1.0 > Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10 > .5.jar slf4j-api-1.7.36.jar > {code} > This bogus {{Class-Path}} element leads to compiler warnings for projects > that utilize it as a dependency: > {code} > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar": > no such file or directory > {code} > Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published > without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or > a new release should be published that corrects this defect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective
[ https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825770#comment-17825770 ] Jeremy Norris commented on KAFKA-16359: --- It's broken because users that attempt to utilize {{kafka-clients-3.7.0.jar}} as a dependency in their projects will generate spurious compiler warnings for something that isn't a defect in their code. And if they also utilize the {{-Werror}} compiler option to promote compiler warnings into errors (as some users are want to do if they enforce cleanliness in their code), their projects & CI pipelines will fail. Bottom line: users should not have to adjust compiler settings in order to consume the artifact. > kafka-clients-3.7.0.jar published to Maven Central is defective > --- > > Key: KAFKA-16359 > URL: https://issues.apache.org/jira/browse/KAFKA-16359 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Jeremy Norris >Assignee: Apoorv Mittal >Priority: Critical > > The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is > defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} > element: > {code} > Manifest-Version: 1.0 > Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10 > .5.jar slf4j-api-1.7.36.jar > {code} > This bogus {{Class-Path}} element leads to compiler warnings for projects > that utilize it as a dependency: > {code} > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar": > no such file or directory > {code} > Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published > without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or > a new release should be published that corrects this defect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective
[ https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825767#comment-17825767 ] Ismael Juma commented on KAFKA-16359: - The original message said there were warnings. But the latest says the artifacts are broken. Can you clarify the impact please? > kafka-clients-3.7.0.jar published to Maven Central is defective > --- > > Key: KAFKA-16359 > URL: https://issues.apache.org/jira/browse/KAFKA-16359 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Jeremy Norris >Assignee: Apoorv Mittal >Priority: Critical > > The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is > defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} > element: > {code} > Manifest-Version: 1.0 > Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10 > .5.jar slf4j-api-1.7.36.jar > {code} > This bogus {{Class-Path}} element leads to compiler warnings for projects > that utilize it as a dependency: > {code} > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar": > no such file or directory > [WARNING] [path] bad path element > ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar": > no such file or directory > {code} > Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published > without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or > a new release should be published that corrects this defect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1521977816 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -42,6 +42,11 @@ object AddPartitionsToTxnManager { val VerificationTimeMsMetricName = "VerificationTimeMs" } +object OperationExpected extends Enumeration { + type operation = Value + val defaultOperation, latestProduceVersion, addPartitionWithoutVerification = Value Review Comment: I also think in scala we typically use sealed traits and case objects. See https://github.com/apache/kafka/blob/58ddd693e69599b177d09c2e384f31e7f5e11171/core/src/main/scala/kafka/controller/KafkaController.scala#L59 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1521974720 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -42,6 +42,11 @@ object AddPartitionsToTxnManager { val VerificationTimeMsMetricName = "VerificationTimeMs" } +object OperationExpected extends Enumeration { + type operation = Value + val defaultOperation, latestProduceVersion, addPartitionWithoutVerification = Value Review Comment: I think we could have something like `default`, `genericError`, and `addPartition`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16352) Transaction may get get stuck in PrepareCommit or PrepareAbort state
[ https://issues.apache.org/jira/browse/KAFKA-16352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825766#comment-17825766 ] Artem Livshits commented on KAFKA-16352: that's correct: the transaction is fully completed, but just cannot transition into complete state because internal state is broken by a race condition > Transaction may get get stuck in PrepareCommit or PrepareAbort state > > > Key: KAFKA-16352 > URL: https://issues.apache.org/jira/browse/KAFKA-16352 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Artem Livshits >Assignee: Artem Livshits >Priority: Major > > A transaction took a long time to complete, trying to restart a producer > would lead to CONCURRENT_TRANSACTION errors. Investigation has shown that > the transaction was stuck in PrepareCommit for a few days: > (current time when the investigation happened: Feb 27 2024), transaction > state: > {{Type |Name |Value}} > {{-}} > {{ref |transactionalId |xxx-yyy}} > {{long |producerId |299364}} > {{ref |state |kafka.coordinator.transaction.PrepareCommit$ > @ 0x44fe22760}} > {{long |txnStartTimestamp |1708619624810 Thu Feb 22 2024 16:33:44.810 > GMT+}} > {{long |txnLastUpdateTimestamp|1708619625335 Thu Feb 22 2024 16:33:45.335 > GMT+}} > {{-}} > The partition list was empty and transactionsWithPendingMarkers didn't > contain the reference to the transactional state. In the log there were the > following relevant messages: > {{22 Feb 2024 @ 16:33:45.623 UTC [Transaction State Manager 1]: Completed > loading transaction metadata from __transaction_state-3 for coordinator epoch > 611}} > (this is the partition that contains the transactional id). After the data > is loaded, it sends out markers and etc. > Then there is this message: > {{22 Feb 2024 @ 16:33:45.696 UTC [Transaction Marker Request Completion > Handler 4]: Transaction coordinator epoch for xxx-yyy has changed from 610 to > 611; cancel sending transaction markers TxnMarkerEntry{producerId=299364, > producerEpoch=1005, coordinatorEpoch=610, result=COMMIT, > partitions=[foo-bar]} to the brokers}} > this message is logged just before the state is removed > transactionsWithPendingMarkers, but the state apparently contained the entry > that was created by the load operation. So the sequence of events probably > looked like the following: > # partition load completed > # commit markers were sent for transactional id xxx-yyy; entry in > transactionsWithPendingMarkers was created > # zombie reply from the previous epoch completed, removed entry from > transactionsWithPendingMarkers > # commit markers properly completed, but couldn't transition to > CommitComplete state because transactionsWithPendingMarkers didn't have the > proper entry, so it got stuck there until the broker was restarted > Looking at the code there are a few cases that could lead to similar race > conditions. The fix is to keep track of the PendingCompleteTxn value that > was used when sending the marker, so that we can only remove the state that > was created when the marker was sent and not accidentally remove the state > someone else created. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
artemlivshits commented on code in PR #15524: URL: https://github.com/apache/kafka/pull/15524#discussion_r1521968243 ## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala: ## @@ -354,41 +366,42 @@ class TransactionMarkerChannelManager( txnLogAppend.newMetadata, appendCallback, _ == Errors.COORDINATOR_NOT_AVAILABLE, RequestLocal.NoCaching) } - def addTxnMarkersToBrokerQueue(transactionalId: String, - producerId: Long, + def addTxnMarkersToBrokerQueue(producerId: Long, producerEpoch: Short, result: TransactionResult, - coordinatorEpoch: Int, + pendingCompleteTxn: PendingCompleteTxn, Review Comment: pendingCompleteTxn has transactional id and coordinator epoch, so we don't need to pass them explicitly. ## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala: ## @@ -109,23 +109,30 @@ object TransactionMarkerChannelManager { } -class TxnMarkerQueue(@volatile var destination: Node) { +class TxnMarkerQueue(@volatile var destination: Node) extends Logging { // keep track of the requests per txn topic partition so we can easily clear the queue // during partition emigration - private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[TxnIdAndMarkerEntry]]().asScala + private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala Review Comment: Now we keep track of the PendingCompleteTxn that was added in the transactionsWithPendingMarkers. ## core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala: ## @@ -419,25 +432,34 @@ class TransactionMarkerChannelManager( def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = { markersQueueForUnknownBroker.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach { queue => - for (entry: TxnIdAndMarkerEntry <- queue.asScala) -removeMarkersForTxnId(entry.txnId) + for (entry <- queue.asScala) { +info(s"Removing $entry for txn partition $txnTopicPartitionId to destination broker -1") +removeMarkersForTxn(entry.pendingCompleteTxn) + } } -markersQueuePerBroker.foreach { case(_, brokerQueue) => +markersQueuePerBroker.foreach { case(brokerId, brokerQueue) => brokerQueue.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach { queue => -for (entry: TxnIdAndMarkerEntry <- queue.asScala) - removeMarkersForTxnId(entry.txnId) +for (entry <- queue.asScala) { + info(s"Removing $entry for txn partition $txnTopicPartitionId to destination broker $brokerId") + removeMarkersForTxn(entry.pendingCompleteTxn) +} } } } - def removeMarkersForTxnId(transactionalId: String): Unit = { -transactionsWithPendingMarkers.remove(transactionalId) + def removeMarkersForTxn(pendingCompleteTxn: PendingCompleteTxn): Unit = { +val transactionalId = pendingCompleteTxn.transactionalId +val removed = transactionsWithPendingMarkers.remove(transactionalId, pendingCompleteTxn) Review Comment: This the the core change -- use the original pendingCompleteTxn value to remove the entry. The rest of the change is pretty much plumbing so that we can supply the correct pendingCompleteTxn. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16318 : add javafoc for kafka metric [kafka]
chia7712 commented on code in PR #15483: URL: https://github.com/apache/kafka/pull/15483#discussion_r1521949713 ## clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java: ## @@ -40,15 +48,28 @@ public KafkaMetric(Object lock, MetricName metricName, MetricValueProvider va this.time = time; } +/** + * Get the configuration of this metric + * @return Return the config of this metric + */ public MetricConfig config() { return this.config; Review Comment: this is unrelate to this PR, but should we add lock to this method? It seems we assume the config can be read/write concurrently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9690) MemoryLeak in JMX Reporter
[ https://issues.apache.org/jira/browse/KAFKA-9690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-9690. --- Resolution: Duplicate close this due to https://issues.apache.org/jira/browse/KAFKA-9306 > MemoryLeak in JMX Reporter > -- > > Key: KAFKA-9690 > URL: https://issues.apache.org/jira/browse/KAFKA-9690 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.4.0 >Reporter: Kaare Nilsen >Priority: Major > Attachments: image-2020-03-10-12-37-49-259.png, > image-2020-03-10-12-44-11-688.png > > > We use kafka in a streamin http application creating a new consumer for each > incoming requests. In version 2.4.0 we experience that the memory builds up > for each new consumer. After debugging the issue after a memory dump revealed > it was in the JMX subsystem we found that one of the JMX beans > (kafka.consumer) build up one metric consumer-metrics without releasing them > on closing the consumer. > What we found is that the metricRemoval > {code:java} > public void metricRemoval(KafkaMetric metric) { > synchronized (LOCK) { > MetricName metricName = metric.metricName(); > String mBeanName = getMBeanName(prefix, metricName); > KafkaMbean mbean = removeAttribute(metric, mBeanName); > if (mbean != null) { > if (mbean.metrics.isEmpty()) { > unregister(mbean); > mbeans.remove(mBeanName); > } else > reregister(mbean); > } > } > } > {code} > The check mbean.metrics.isEmpty() for this particular metric never yielded > true so the mbean was never removed. Thus building up the mbeans HashMap. > The metrics that is not released are: > {code:java} > last-poll-seconds-ago > poll-idle-ratio-avg") > time-between-poll-avg > time-between-poll-max > {code} > I have a workaround in my code now by having a modified JMXReporter in my pwn > project with the following close method > {code:java} > public void close() { > synchronized (LOCK) { > for (KafkaMbean mbean : this.mbeans.values()) { > mbean.removeAttribute("last-poll-seconds-ago"); > mbean.removeAttribute("poll-idle-ratio-avg"); > mbean.removeAttribute("time-between-poll-avg"); > mbean.removeAttribute("time-between-poll-max"); > unregister(mbean); > } > } > } > {code} > This will remove the attributes that are not cleaned up and prevent the > memory leakage, but I have not found the root casue. > Another workaround is to use kafka client 2.3.1 > > this is how it looks in the jmx console after a couple of clients have > connected and disconnected. Here you can see that the one metric builds up > and the old ones have the four attributes that makes the unregister fail. > > !image-2020-03-10-12-37-49-259.png! > > dThis Is how it looks after a while in kafka client 2.3.1 > !image-2020-03-10-12-44-11-688.png! > As you can see no leakage here. > I suspect this pull request to be the one that have introduced the leak: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-517%3A+Add+consumer+metrics+to+observe+user+poll+behavior] > https://issues.apache.org/jira/browse/KAFKA-8874 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean
[ https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-9504. --- Resolution: Duplicate close this due to https://issues.apache.org/jira/browse/KAFKA-9306 > Memory leak in KafkaMetrics registered to MBean > --- > > Key: KAFKA-9504 > URL: https://issues.apache.org/jira/browse/KAFKA-9504 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.4.0 >Reporter: Andreas Holmén >Priority: Major > > After close() called on a KafkaConsumer some registered MBeans are not > unregistered causing leak. > > > {code:java} > import static > org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; > import java.lang.management.ManagementFactory; > import java.util.HashMap; > import java.util.Map; > import javax.management.MBeanServer; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.serialization.ByteArrayDeserializer; > public class Leaker { > private static String bootstrapServers = "hostname:9092"; > > public static void main(String[] args) throws InterruptedException { > MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); > Map props = new HashMap<>(); > props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); > > int beans = mBeanServer.getMBeanCount(); > for (int i = 0; i < 100; i++) { >KafkaConsumer consumer = new KafkaConsumer<>(props, new > ByteArrayDeserializer(), new ByteArrayDeserializer()); >consumer.close(); > } > int newBeans = mBeanServer.getMBeanCount(); > System.out.println("\nbeans delta: " + (newBeans - beans)); > } > } > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
artemlivshits opened a new pull request, #15524: URL: https://github.com/apache/kafka/pull/15524 …tate Now the removal of entries from the transactionsWithPendingMarkers map checks the value and all pending marker operations keep the value along with the operation state. This way, the pending marker operation can only delete the state it created and wouldn't accidentally delete the state from a different epoch (which could lead to "stuck" transactions). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1521937262 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -210,12 +213,12 @@ public class MembershipManagerImpl implements MembershipManager { private final Map assignedTopicNamesCache; /** - * Topic IDs and partitions received in the last target assignment. Items are added to this set - * every time a target assignment is received. This is where the member collects the assignment - * received from the broker, even though it may not be ready to fully reconcile due to missing - * metadata. + * Topic IDs and partitions received in the last target assignment, together with its local epoch. + * + * This member reassigned every time a new assignment is received. Review Comment: I don't quite get this sentence (was the intention just to start with `Reassigned every`... I guess...?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]
chia7712 commented on code in PR #14471: URL: https://github.com/apache/kafka/pull/14471#discussion_r1521936219 ## build.gradle: ## @@ -1999,10 +1999,14 @@ project(':tools') { implementation project(':clients') implementation project(':storage') implementation project(':server-common') +implementation project(':server') Review Comment: Is this dependency necessary? we don't expect tool module depends on core module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1521933499 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -889,43 +914,36 @@ private void transitionToStale() { */ void maybeReconcile() { if (targetAssignmentReconciled()) { -log.debug("Ignoring reconciliation attempt. Target assignment is equal to the " + +log.trace("Ignoring reconciliation attempt. Target assignment is equal to the " + "current assignment."); return; } if (reconciliationInProgress) { -log.debug("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + +log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + currentTargetAssignment + " will be handled in the next reconciliation loop."); return; } // Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update // if some topic IDs are not resolvable. SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); +final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); -SortedSet ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); -ownedPartitions.addAll(subscriptions.assignedPartitions()); - -// Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are -// being reconciled. Needed for interactions with the centralized subscription state that -// does not support topic IDs yet, and for the callbacks. -SortedSet assignedTopicPartitions = toTopicPartitionSet(assignedTopicIdPartitions); - -// Check same assignment. Based on topic names for now, until topic IDs are properly -// supported in the centralized subscription state object. Note that this check is -// required to make sure that reconciliation is not triggered if the assignment ready to -// be reconciled is the same as the current one (even though the member may remain -// in RECONCILING state if it has some unresolved assignments). -boolean sameAssignmentReceived = assignedTopicPartitions.equals(ownedPartitions); - -if (sameAssignmentReceived) { +if (resolvedAssignment.equals(currentAssignment)) { Review Comment: This is short-circuiting the reconciliation if the same assignment is received (same epoch, same partitions). But we also need to consider the case that we get the same partitions assigned but in a different epoch. In that case, we should not carry on with the full reconciliation process (there is truly nothing to reconcile), but we should send an ack to the broker, so I would expect we need a similar short-circuit for `if sameAssignmentInDiffEpoch => transitionToAck & return;`. It's mainly thinking about the case: - member owns t1-1 epoch 3 - receives new assignment [t1-1, t2-1] epoch 4, stuck reconciling, ex. missing t2 metadata - receives new assignment [t1-1] epoch 5 (ex. broker realized t2 has been deleted) - member does not need to reconcile t1-1, but should send an ack to the broker with t1-1 that it received on a newer epoch Makes sense? Not sure if I may be missing something regarding the expectations -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1521933499 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -889,43 +914,36 @@ private void transitionToStale() { */ void maybeReconcile() { if (targetAssignmentReconciled()) { -log.debug("Ignoring reconciliation attempt. Target assignment is equal to the " + +log.trace("Ignoring reconciliation attempt. Target assignment is equal to the " + "current assignment."); return; } if (reconciliationInProgress) { -log.debug("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + +log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + currentTargetAssignment + " will be handled in the next reconciliation loop."); return; } // Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update // if some topic IDs are not resolvable. SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); +final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); -SortedSet ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); -ownedPartitions.addAll(subscriptions.assignedPartitions()); - -// Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are -// being reconciled. Needed for interactions with the centralized subscription state that -// does not support topic IDs yet, and for the callbacks. -SortedSet assignedTopicPartitions = toTopicPartitionSet(assignedTopicIdPartitions); - -// Check same assignment. Based on topic names for now, until topic IDs are properly -// supported in the centralized subscription state object. Note that this check is -// required to make sure that reconciliation is not triggered if the assignment ready to -// be reconciled is the same as the current one (even though the member may remain -// in RECONCILING state if it has some unresolved assignments). -boolean sameAssignmentReceived = assignedTopicPartitions.equals(ownedPartitions); - -if (sameAssignmentReceived) { +if (resolvedAssignment.equals(currentAssignment)) { Review Comment: This is short-circuiting the reconciliation if the same assignment is received (same epoch, same partitions). But we also need to consider also the case that we get the same partitions assigned but in a different epoch. In that case, we should not carry on with the full reconciliation process (there is truly nothing to reconcile), but we should send an ack to the broker, so I would expect we need a similar short-circuit for `if sameAssignmentInDiffEpoch => transitionToAck & return;`. It's mainly thinking about the case: - member owns t1-1 epoch 3 - receives new assignment [t1-1, t2-1] epoch 4, stuck reconciling, ex. missing t2 metadata - receives new assignment [t1-1] epoch 5 (ex. broker realized t2 has been deleted) - member does not need to reconcile t1-1, but should send an ack to the broker with t1-1 that it received on a newer epoch Makes sense? Not sure if I may be missing something regarding the expectations -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1521827156 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: Currently, in the while-loop of the emitNonJoinedOuterRecords() method we are iterating over ALL the left- and right-side outerJoinRecords that are available in the outerjoin-store until we meet the **break;**. The idea of the two outerJoinBreak flags was to keep track of when the window of the outerJoinRecords is not closed anymore, but this is only useful if the ordering of the KeyValueIterator is by timeStampedKey, and it is not: The documentation says that the ordering of the KeyValueIterator is NOT guaranteed. So, now I think we better can remove the outerJoinBreak flags and just check for each outerJoinRecord whether it belongs to a closed window or not, without any optimization. If the window has closed we can emit a nullJoinedValue. If the window is not closed yet we can continue with the next outerJoinRecord. What do you think? @mjsax @florin-akermann -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15250) ConsumerNetworkThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15250: -- Summary: ConsumerNetworkThread is running tight loop (was: DefaultBackgroundThread is running tight loop) > ConsumerNetworkThread is running tight loop > --- > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1521780875 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1007,7 +1010,8 @@ class ReplicaManager(val config: KafkaConfig, transactionalId, producerId, producerEpoch, - generalizedCallback + generalizedCallback, + ApiKeys.PRODUCE.latestVersion Review Comment: This will be done as a follow up and part of KIP-890 part 2 cc: @CalvinConfluent who was also thinking about this part -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
sjhajharia commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1521767609 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1007,7 +1010,8 @@ class ReplicaManager(val config: KafkaConfig, transactionalId, producerId, producerEpoch, - generalizedCallback + generalizedCallback, + ApiKeys.PRODUCE.latestVersion Review Comment: So for this particular path it would be 0 (default) And for the new workflow we added, it would be new error (1) Where can we use the 3rd use case --> that is add partitions rather than verifying? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1521750747 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -508,9 +508,30 @@ private void processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign */ private void replaceTargetAssignmentWithNewAssignment( ConsumerGroupHeartbeatResponseData.Assignment assignment) { -currentTargetAssignment.clear(); + Review Comment: 1. I did not do it, because the `LocalAssignment` is leaked from this file via the `currentAssignment` method, and I didn't necessarily want to put so much logic in the public interface `MembershipManager`. However, I think I could possibly define the return value of `currentAssignment` by a light interface, and then put the fat class with all the updating logic in here. I'll give it a try. 2. I wouldn't necessarily call it `EMPTY` (to avoid confusion with an empty assignment), but rather `NONE` or something, but other than that it sounds like a good idea. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } }); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining or if has changed since the last heartbeat +if (membershipManager.memberEpoch() == 0 || sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { Review Comment: true, this can be simplified ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -566,18 +567,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // ClientAssignors - not supported yet -// TopicPartitions - only sent if it has changed since the last heartbeat. Note that -// the string consists of just the topic ID and the partitions. When an assignment is -// received, we might not yet know the topic name, and then it is learnt subsequently -// by a metadata update. -TreeSet assignedPartitions = membershipManager.currentAssignment().entrySet().stream() -.map(entry -> entry.getKey() + "-" + entry.getValue()) -.collect(Collectors.toCollection(TreeSet::new)); -if (!assignedPartitions.equals(sentFields.topicPartitions)) { +// TopicPartitions - sent with the first heartbeat after a new assignment from the server was +// reconciled. This is ensured by resending the topic partitions whenever the local assignment, +// including its local epoch is changed (although the local epoch is not sent in the heartbeat). +LocalAssignment local = membershipManager.currentAssignment(); +if (local == null) { +data.setTopicPartitions(Collections.emptyList()); +sentFields.topicPartitions = null; +} else if (!local.equals(sentFields.topicPartitions)) { Review Comment: That's what is being done. Maybe I should have renamed the field `sentFields.topicPartitions` to `sentFields.localAssignment`, but the type is now `LocalAssignment` and the `equals` compares the local epoch as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1521745679 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -508,9 +508,30 @@ private void processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign */ private void replaceTargetAssignmentWithNewAssignment( ConsumerGroupHeartbeatResponseData.Assignment assignment) { -currentTargetAssignment.clear(); + +// Return if the same as current assignment; comparison without creating a new collection +if (currentTargetAssignment != null) { +// check if the new assignment is different from the current target assignment +if (currentTargetAssignment.partitions.size() == assignment.topicPartitions().size() && +assignment.topicPartitions().stream().allMatch( +tp -> currentTargetAssignment.partitions.containsKey(tp.topicId()) && + currentTargetAssignment.partitions.get(tp.topicId()).size() == tp.partitions().size() && + currentTargetAssignment.partitions.get(tp.topicId()).containsAll(tp.partitions( { +return; +} +} + +// Bump local epoch and replace assignment +long nextLocalEpoch; +if (currentTargetAssignment == null) { +nextLocalEpoch = 0; +} else { +nextLocalEpoch = currentTargetAssignment.localEpoch + 1; +} Review Comment: Is there a reason why we need to compute epochs on the client here? The server is the one bumping the epochs whenever it computes a new assignment for a member. I was expecting that we just keep a `LocalAssignment` that will contain the partitions and epoch the broker sent on the `ConsumerGroupHeartbeatResponseData` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1521727181 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { Review Comment: Looking at the usage here, I can't see a reason why this would be desirable, maybe the intention was to have the option to save on same space until absolutely necessary? Although, as I mentioned in the PR description, this is not worth the resizing costs. Also, this method is guaranteed to add more records to the list than `ids.size()` as the method add elements in multiple places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1521713491 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -35,24 +35,12 @@ public class BoundedList implements List { private final int maxLength; private final List underlying; -public static BoundedList newArrayBacked(int maxLength) { -return new BoundedList<>(maxLength, new ArrayList<>()); -} - -public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { -return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity)); -} - -public BoundedList(int maxLength, List underlying) { Review Comment: I considered this at first, but then I realized that this is only ever used in unit tests. Due to that fact, I decided to remove it and have he unit tests match how the class is used in the source code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1521679508 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1007,7 +1010,8 @@ class ReplicaManager(val config: KafkaConfig, transactionalId, producerId, producerEpoch, - generalizedCallback + generalizedCallback, + ApiKeys.PRODUCE.latestVersion Review Comment: What if we send a offset commit request that can't yet handle the error. I think we may need a different data structure. Maybe something like enums to handle the various versions 0 -- default, 1 -- new error, 2 -- add partitions rather than verifying. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test [kafka]
kamalcph commented on PR #15523: URL: https://github.com/apache/kafka/pull/15523#issuecomment-1991918079 > Thanks for the change, @kamalcph! Just for my knowledge, why is there a possibility that the Shutdownable thread never ran in the CI for this test? Due to resource constraint, `ShutdownableThread#run` might be called later than the `ShutdownableThread#close` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15949: Unify metadata.version format in log and error message [kafka]
FrankYang0529 commented on code in PR #15505: URL: https://github.com/apache/kafka/pull/15505#discussion_r1521640919 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -139,7 +139,7 @@ object StorageTool extends Logging { action(storeTrue()) formatParser.addArgument("--release-version", "-r"). action(store()). - help(s"A KRaft release version to use for the initial metadata version. The minimum is 3.0, the default is ${MetadataVersion.LATEST_PRODUCTION.version()}") + help(s"A KRaft release version to use for the initial metadata.version. The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is ${MetadataVersion.LATEST_PRODUCTION.version()}") Review Comment: Hi @showuon, thanks for the review. Using `MetadataVersion.IBP_3_0_IV0` or `MetadataVersion.IBP_3_0_IV0.version()` has same result. Both `toString` and `version` functions return `ibpVersion`: https://github.com/apache/kafka/blob/58ddd693e69599b177d09c2e384f31e7f5e11171/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L523-L525 https://github.com/apache/kafka/blob/58ddd693e69599b177d09c2e384f31e7f5e11171/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L632-L634 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16365: AssignmentsManager callback handling issues [kafka]
soarez commented on code in PR #15521: URL: https://github.com/apache/kafka/pull/15521#discussion_r1521634948 ## server/src/main/java/org/apache/kafka/server/AssignmentsManager.java: ## @@ -202,9 +199,12 @@ public void run() throws Exception { return; } if (existing.timestampNs > timestampNs) { -existing.onComplete(); -if (log.isDebugEnabled()) log.debug("Dropping assignment {} because it's older than {}", this, existing); +existing.merge(this); +if (log.isDebugEnabled()) log.debug("Dropping assignment {} because it's older than existing {}", this, existing); return; +} else { +this.merge(existing); Review Comment: Good point. Addressed in [3ebe6e1](https://github.com/apache/kafka/pull/15521/commits/3ebe6e12428dfe7b45d9bb1270c4e38e50eb19ab) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1521632809 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } }); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining or if has changed since the last heartbeat +if (membershipManager.memberEpoch() == 0 || sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { Review Comment: it is static, taken from the `max.poll.interval.ms`, so agree we could set it only when epoch == 0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16365: AssignmentsManager callback handling issues [kafka]
gaurav-narula commented on code in PR #15521: URL: https://github.com/apache/kafka/pull/15521#discussion_r1521612209 ## server/src/main/java/org/apache/kafka/server/AssignmentsManager.java: ## @@ -202,9 +199,12 @@ public void run() throws Exception { return; } if (existing.timestampNs > timestampNs) { -existing.onComplete(); -if (log.isDebugEnabled()) log.debug("Dropping assignment {} because it's older than {}", this, existing); +existing.merge(this); +if (log.isDebugEnabled()) log.debug("Dropping assignment {} because it's older than existing {}", this, existing); Review Comment: I think we can avoid `log.isDebugEnabled()` and the like when using parameterised messages. Refer https://www.slf4j.org/faq.html#logging_performance ## server/src/main/java/org/apache/kafka/server/AssignmentsManager.java: ## @@ -202,9 +199,12 @@ public void run() throws Exception { return; } if (existing.timestampNs > timestampNs) { -existing.onComplete(); -if (log.isDebugEnabled()) log.debug("Dropping assignment {} because it's older than {}", this, existing); +existing.merge(this); +if (log.isDebugEnabled()) log.debug("Dropping assignment {} because it's older than existing {}", this, existing); return; +} else { +this.merge(existing); Review Comment: Doesn't this result in the callback handlers for existing being run twice if existing was retrieved from inflight? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
dajac commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1521552751 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { } }); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining or if has changed since the last heartbeat +if (membershipManager.memberEpoch() == 0 || sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { Review Comment: If not mistaken, the `rebalanceTimeoutMs` timeout is a static config so we could actually set it only in when joining the group (when epoch == 0). ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -508,9 +508,30 @@ private void processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign */ private void replaceTargetAssignmentWithNewAssignment( ConsumerGroupHeartbeatResponseData.Assignment assignment) { -currentTargetAssignment.clear(); + Review Comment: I have to high level thoughts but I am not sure whether they are worth it: 1) Have you considered moving all the update logic into `LocalAssignment`? We could have a method such a `updateWith(Assignment)` which returns an `Optional` containing the new assignment if it was updated. 2) On a similar line, I wonder if we could have an `EMPTY` constant for the default `LocalAssignment(-1, null)` instead of relying on `null`. The reasoning of this one is that it avoids having to deal with `null` in a few places in this file. What do you think? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -566,18 +567,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // ClientAssignors - not supported yet -// TopicPartitions - only sent if it has changed since the last heartbeat. Note that -// the string consists of just the topic ID and the partitions. When an assignment is -// received, we might not yet know the topic name, and then it is learnt subsequently -// by a metadata update. -TreeSet assignedPartitions = membershipManager.currentAssignment().entrySet().stream() -.map(entry -> entry.getKey() + "-" + entry.getValue()) -.collect(Collectors.toCollection(TreeSet::new)); -if (!assignedPartitions.equals(sentFields.topicPartitions)) { +// TopicPartitions - sent with the first heartbeat after a new assignment from the server was +// reconciled. This is ensured by resending the topic partitions whenever the local assignment, +// including its local epoch is changed (although the local epoch is not sent in the heartbeat). +LocalAssignment local = membershipManager.currentAssignment(); +if (local == null) { +data.setTopicPartitions(Collections.emptyList()); +sentFields.topicPartitions = null; +} else if (!local.equals(sentFields.topicPartitions)) { Review Comment: Don't we need to also take the assignment epoch into consideration here? In other words, should we store the LocalAssignment in `sentFields` and use it to do the comparison? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on PR #15364: URL: https://github.com/apache/kafka/pull/15364#issuecomment-1991716363 @jolshan Thanks for your comments. I addressed them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on PR #15364: URL: https://github.com/apache/kafka/pull/15364#issuecomment-1991715637 > > This is a non-backward compatible change. I think that we should do this change to cleanup the record. As KIP-848 is only in early access in 3.7 and that we clearly state that we don't plane to support upgrade from it, this is acceptable in my opinion. > > Are we gating this record change under anything? There is no gating at all for the records at the moment besides the static configuration to enable the protocol. We will introduce the proper gating based on the new feature version for 3.8. With those changes, we will clearly break the backward compatibility but, as you said, we said that upgrades won't be supported from the EA. I actually thought hard about no doing this but I think that it is preferable to do it in order to not carry on bad record format from the start. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]
OmniaGM commented on code in PR #15506: URL: https://github.com/apache/kafka/pull/15506#discussion_r1521506327 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java: ## @@ -1215,17 +1195,13 @@ public void testSuppressCloseErrors() { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); -try { -workerTask.execute(); -fail("workerTask.execute should have thrown an exception"); -} catch (ConnectException e) { -assertSame("Exception from put should be the cause", putException, e.getCause()); -assertTrue("Exception from close should be suppressed", e.getSuppressed().length > 0); -assertSame(closeException, e.getSuppressed()[0]); -} + +Throwable thrownException = assertThrows(ConnectException.class, () -> workerTask.execute()); +assertSame("Exception from put should be the cause", putException, thrownException.getCause()); Review Comment: While `assertSame` and `assertEquals` behave the same when the default equals implementation is provided (which compare references) but I'm curious why are we opting in for `assertSame` over `assertEquals`. Do we need the test to explicitly comparing object reference or just value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1521508636 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -931,17 +920,14 @@ public void testGroupEpochBumpWhenNewStaticMemberJoins() { ConsumerGroupMember expectedMember3 = new ConsumerGroupMember.Builder(memberId3) .setMemberEpoch(11) +.setState(MemberState.UNRELEASED_PARTITIONS) .setInstanceId(memberId3) .setPreviousMemberEpoch(0) -.setTargetMemberEpoch(11) .setClientId("client") .setClientHost("localhost/127.0.0.1") .setRebalanceTimeoutMs(5000) .setSubscribedTopicNames(Arrays.asList("foo", "bar")) .setServerAssignorName("range") -.setPartitionsPendingAssignment(mkAssignment( Review Comment: It is a matter of the difference between the owned partitions and the assignment. The field was not really used in the end so I thought that it is better to remove it to save space. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1521507018 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -338,7 +338,6 @@ public void testConsumerGroupMemberEpochValidation() { ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId) .setMemberEpoch(100) .setPreviousMemberEpoch(99) -.setTargetMemberEpoch(100) Review Comment: I think that it makes sense to add it. I reviewed all the cases in the file and added where it makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]
OmniaGM commented on code in PR #15506: URL: https://github.com/apache/kafka/pull/15506#discussion_r1521506327 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java: ## @@ -1215,17 +1195,13 @@ public void testSuppressCloseErrors() { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); -try { -workerTask.execute(); -fail("workerTask.execute should have thrown an exception"); -} catch (ConnectException e) { -assertSame("Exception from put should be the cause", putException, e.getCause()); -assertTrue("Exception from close should be suppressed", e.getSuppressed().length > 0); -assertSame(closeException, e.getSuppressed()[0]); -} + +Throwable thrownException = assertThrows(ConnectException.class, () -> workerTask.execute()); +assertSame("Exception from put should be the cause", putException, thrownException.getCause()); Review Comment: While `assertSame` and `assertEquals` act the same when the default equals implementation is provided (which compare references) but I'm curious why are we opting in for `assertSame` over `assertEquals`. Do we need the test to explicitly comparing object reference or just value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: simplify consumer logic [kafka]
lianetm commented on code in PR #15519: URL: https://github.com/apache/kafka/pull/15519#discussion_r1521497910 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -529,10 +529,9 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member +// InstanceId - send when leaving the group as a static member membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { +if (membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { data.setInstanceId(groupInstanceId); sentFields.instanceId = groupInstanceId; Review Comment: Agree we could simplify and remove the `instanceId` from the `sentFields` like @chia7712 suggested. I don't see the need to track it as it's not something that will change and we just simply need to send the value we have. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -529,10 +529,9 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member +// InstanceId - send when leaving the group as a static member membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { Review Comment: Agree, the current coordinator expectation seems to me like the right thing to do, and we should send the `groupInstanceId` all the time (in the same way that we send the `memberId` all the time). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1521495237 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/MemberState.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import java.util.HashMap; +import java.util.Map; + +/** + * The various states that a member can be in. For their definition, + * refer to the documentation of {{@link CurrentAssignmentBuilder}}. Review Comment: The doc is in the code now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1521494464 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -170,72 +127,122 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions( * @return A new ConsumerGroupMember or the current one. */ public ConsumerGroupMember build() { -// A new target assignment has been installed, we need to restart -// the reconciliation loop from the beginning. -if (targetAssignmentEpoch != member.targetMemberEpoch()) { -return transitionToNewTargetAssignmentState(); -} - switch (member.state()) { -// Check if the partitions have been revoked by the member. -case REVOKING: -return maybeTransitionFromRevokingToAssigningOrStable(); +case STABLE: +// When the member is in the STABLE state, we verify if a newer +// epoch (or target assignment) is available. If it is, we can +// reconcile the member towards it. Otherwise, we return. +if (member.memberEpoch() != targetAssignmentEpoch) { +return computeNextAssignment( +member.memberEpoch(), +member.assignedPartitions() +); +} else { +return member; +} -// Check if pending partitions have been freed up. -case ASSIGNING: -return maybeTransitionFromAssigningToAssigningOrStable(); +case UNREVOKED_PARTITIONS: +// When the member is in the UNREVOKED_PARTITIONS state, we wait +// until the member has revoked the necessary partitions. They are +// considered revoked when they are not anymore reported in the +// owned partitions set in the ConsumerGroupHeartbeat API. -// Nothing to do. -case STABLE: -return member; +// If the member does not provide its owned partitions. We cannot +// progress. +if (ownedTopicPartitions == null) { +return member; +} + +// If the member provides its owned partitions. We verify if it still +// owns any of the revoked partitions. If it does, we cannot progress. +for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) { +for (Integer partitionId : topicPartitions.partitions()) { +boolean stillHasRevokedPartition = member +.partitionsPendingRevocation() +.getOrDefault(topicPartitions.topicId(), Collections.emptySet()) +.contains(partitionId); +if (stillHasRevokedPartition) { +return member; +} +} +} + +// When the member has revoked all the pending partitions, it can +// transition to the next epoch (current + 1) and we can reconcile +// its state towards the latest target assignment. +return computeNextAssignment( +member.memberEpoch() + 1, +member.assignedPartitions() +); + +case UNRELEASED_PARTITIONS: +// When the member is in the UNRELEASED_PARTITIONS, we reconcile the +// member towards the latest target assignment. This will assign any +// of the unreleased partitions when they become available. +return computeNextAssignment( +member.memberEpoch(), +member.assignedPartitions() +); + +case UNKNOWN: +// We could only end up in this state if a new state is added in the +// future and the group coordinator is downgraded. In this case, the +// best option is to fence the member to force it to rejoin the group +// without any partitions and to reconcile it again from scratch. +if (ownedTopicPartitions == null || !ownedTopicPartitions.isEmpty()) { +throw new FencedMemberEpochException("The consumer group member is in a unknown state. " ++ "The member must abandon all its partitions and rejoin."); +} + +return computeNextAssignment( Review Comment: We would only hit this case if we ever introduce a new state in the future. I was trying to ensure that we would handle it somehow. My proposal here is to return the fenced member epoch error in order to force the
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1521491629 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -33,49 +33,6 @@ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. - * - * The member state has the following properties: - * - Current Epoch: Review Comment: Indeed. I moved the description closer to the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1521490928 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ## @@ -528,27 +478,6 @@ public Map> partitionsPendingRevocation() { return partitionsPendingRevocation; } -/** - * @return The set of partitions awaiting assignment to the member. - */ -public Map> partitionsPendingAssignment() { -return partitionsPendingAssignment; -} - -/** - * @return A string representation of the current assignment state. - */ -public String currentAssignmentSummary() { Review Comment: Correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1521488580 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1796,12 +1834,12 @@ public void onLoaded() { consumerGroup.members().forEach((memberId, member) -> { log.debug("Loaded member {} in consumer group {}.", memberId, groupId); scheduleConsumerGroupSessionTimeout(groupId, memberId); -if (member.state() == ConsumerGroupMember.MemberState.REVOKING) { -scheduleConsumerGroupRevocationTimeout( +if (member.state() == MemberState.UNREVOKED_PARTITIONS) { +scheduleConsumerGroupRebalanceTimeout( groupId, -memberId, -member.rebalanceTimeoutMs(), -member.memberEpoch() +member.memberId(), Review Comment: I am actually not sure why I changed it but they have both the same value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1521487635 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1400,35 +1440,35 @@ private void cancelConsumerGroupSessionTimeout( } /** - * Schedules a revocation timeout for the member. + * Schedules a rebalance timeout for the member. * * @param groupId The group id. * @param memberId The member id. - * @param revocationTimeoutMs The revocation timeout. - * @param expectedMemberEpoch The expected member epoch. + * @param memberEpoch The member epoch. + * @param rebalanceTimeoutMsThe rebalance timeout. */ -private void scheduleConsumerGroupRevocationTimeout( +private void scheduleConsumerGroupRebalanceTimeout( String groupId, String memberId, -long revocationTimeoutMs, -int expectedMemberEpoch +int memberEpoch, +int rebalanceTimeoutMs ) { -String key = consumerGroupRevocationTimeoutKey(groupId, memberId); -timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +String key = consumerGroupRebalanceTimeoutKey(groupId, memberId); +timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { try { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); -if (member.state() != ConsumerGroupMember.MemberState.REVOKING || -member.memberEpoch() != expectedMemberEpoch) { -log.debug("[GroupId {}] Ignoring revocation timeout for {} because the member " + -"state does not match the expected state.", groupId, memberId); +if (member.memberEpoch() == memberEpoch) { +log.info("[GroupId {}] Member {} fenced from the group because " + +"it failed to transition from epoch {} within {}ms.", +groupId, memberId, memberEpoch, rebalanceTimeoutMs); +return new CoordinatorResult<>(consumerGroupFenceMember(group, member)); Review Comment: > I may have asked this on a previous pr, but are we assuming the member epoch of the member (not the one passed in) is always never less than the member epoch passed into this method. That makes sense given the epoch is monotonically increasing, but just wanted to confirm. It is only newer when the member confirms that it has revoked the partitions. Here we are basically saying that we want a member to transition from its current epoch (and acks the revocation) within the rebalance timeout. > As an aside, when we fence a group member, do we basically kick it out of the group and force it to rejoin? Can the client rejoin without restarting? On the server, we only fence the member. On the client, it will rejoin when it is fenced but this is a client side implementation choice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1521482723 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1192,99 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. -if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { +if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } +/** + * Reconciles the current assignment of the member if needed. + * + * @param groupId The group id. + * @param memberThe member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. + * @return The received member if no changes have been made; or a new + * member containing the new assignment. + */ +private ConsumerGroupMember maybeReconcile( +String groupId, +ConsumerGroupMember member, +BiFunction currentPartitionEpoch, +int targetAssignmentEpoch, +Assignment targetAssignment, +List ownedTopicPartitions, +List records +) { +if (member.isReconciledTo(targetAssignmentEpoch)) { +return member; +} + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(targetAssignmentEpoch, targetAssignment) +.withCurrentPartitionEpoch(currentPartitionEpoch) +.withOwnedTopicPartitions(ownedTopicPartitions) +.build(); + +if (!updatedMember.equals(member)) { +records.add(newCurrentAssignmentRecord(groupId, updatedMember)); + +log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, " + + "assignedPartitions={} and revokedPartitions={}.", +groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), +formatAssignment(updatedMember.assignedPartitions()), formatAssignment(updatedMember.revokedPartitions())); + +if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { +scheduleConsumerGroupRebalanceTimeout( +groupId, +updatedMember.memberId(), +updatedMember.memberEpoch(), +updatedMember.rebalanceTimeoutMs() +); +} else { Review Comment: So I use rebalance timeout in order to be consistent with the rebalanceTimeoutMs field. In the end, the name does not really matter here. The important part is how we apply it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1521477968 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1192,99 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; Review Comment: I will address this separately: https://issues.apache.org/jira/browse/KAFKA-16367. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16367) Full ConsumerGroupHeartbeat response must be sent when full request is received
David Jacot created KAFKA-16367: --- Summary: Full ConsumerGroupHeartbeat response must be sent when full request is received Key: KAFKA-16367 URL: https://issues.apache.org/jira/browse/KAFKA-16367 Project: Kafka Issue Type: Sub-task Reporter: David Jacot Assignee: David Jacot -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]
OmniaGM commented on code in PR #15506: URL: https://github.com/apache/kafka/pull/15506#discussion_r1521477925 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java: ## @@ -580,12 +570,9 @@ public void testErrorInRebalancePartitionRevocation() { workerTask.iteration(); verifyPollInitialAssignment(); -try { -workerTask.iteration(); -fail("Poll should have raised the rebalance exception"); -} catch (RuntimeException e) { -assertEquals(exception, e); -} + +Throwable thrownException = assertThrows(RuntimeException.class, () -> workerTask.iteration()); Review Comment: same, so am not going to point it out for each test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]
OmniaGM commented on code in PR #15506: URL: https://github.com/apache/kafka/pull/15506#discussion_r1521475624 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java: ## @@ -553,12 +547,8 @@ public void testErrorInRebalancePartitionLoss() { workerTask.iteration(); verifyPollInitialAssignment(); -try { -workerTask.iteration(); -fail("Poll should have raised the rebalance exception"); -} catch (RuntimeException e) { -assertEquals(exception, e); -} +Throwable thrownException = assertThrows(RuntimeException.class, () -> workerTask.iteration()); Review Comment: The type of the `thrownException` should be `RuntimeException` as it has been casted when we called `assertThrows(RuntimeException.class, )` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16293) Test log directory failure in Kraft
[ https://issues.apache.org/jira/browse/KAFKA-16293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim updated KAFKA-16293: -- Fix Version/s: 3.7.1 > Test log directory failure in Kraft > --- > > Key: KAFKA-16293 > URL: https://issues.apache.org/jira/browse/KAFKA-16293 > Project: Kafka > Issue Type: Test > Components: jbod, kraft >Affects Versions: 3.7.0 >Reporter: Gaurav Narula >Assignee: Gaurav Narula >Priority: Major > Fix For: 3.7.1 > > > We should update the log directory failure system test to run in Kraft mode > following the work for KIP 858. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically
[ https://issues.apache.org/jira/browse/KAFKA-16234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim updated KAFKA-16234: -- Fix Version/s: 3.7.1 > Log directory failure re-creates partitions in another logdir automatically > --- > > Key: KAFKA-16234 > URL: https://issues.apache.org/jira/browse/KAFKA-16234 > Project: Kafka > Issue Type: Sub-task > Components: jbod >Affects Versions: 3.7.0 >Reporter: Gaurav Narula >Assignee: Omnia Ibrahim >Priority: Major > Fix For: 3.7.1 > > > With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes > in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. > Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old > and new topicIds to decide if it needs to create a new log. > The getter for {{Partition::topicId}} relies on retrieving the topicId from > {{log}} field or {{{}logManager.currentLogs{}}}. The former is set to > {{None}} when a partition is marked offline and the key for the partition is > removed from the latter by {{{}LogManager::handleLogDirFailure{}}}. > Therefore, topicId for a partitioned marked offline always returns {{None}} > and new logs for all partitions in a failed log directory are always created > on another disk. > The broker will fail to restart after the failed disk is repaired because > same partitions will occur in two different directories. The error does > however inform the operator to remove the partitions from the disk that > failed which should help with broker startup. > We can avoid this with KAFKA-16212 but in the short-term, an immediate > solution can be to have {{Partition}} object accept {{Option[TopicId]}} in > it's constructor and have it fallback to {{log}} or {{logManager}} if it's > unset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16365) AssignmentsManager mismanages completion notifications
[ https://issues.apache.org/jira/browse/KAFKA-16365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim updated KAFKA-16365: -- Affects Version/s: 3.7.0 > AssignmentsManager mismanages completion notifications > -- > > Key: KAFKA-16365 > URL: https://issues.apache.org/jira/browse/KAFKA-16365 > Project: Kafka > Issue Type: Sub-task > Components: jbod >Affects Versions: 3.7.0 >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > Fix For: 3.7.1 > > > When moving replicas between directories in the same broker, future replica > promotion hinges on acknowledgment from the controller of a change in the > directory assignment. > > ReplicaAlterLogDirsThread relies on AssignmentsManager for a completion > notification of the directory assignment change. > > In its current form, under certain assignment scheduling, AssignmentsManager > both miss completion notifications, or prematurely trigger them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16365) AssignmentsManager mismanages completion notifications
[ https://issues.apache.org/jira/browse/KAFKA-16365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim updated KAFKA-16365: -- Fix Version/s: 3.7.1 > AssignmentsManager mismanages completion notifications > -- > > Key: KAFKA-16365 > URL: https://issues.apache.org/jira/browse/KAFKA-16365 > Project: Kafka > Issue Type: Sub-task > Components: jbod >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > Fix For: 3.7.1 > > > When moving replicas between directories in the same broker, future replica > promotion hinges on acknowledgment from the controller of a change in the > directory assignment. > > ReplicaAlterLogDirsThread relies on AssignmentsManager for a completion > notification of the directory assignment change. > > In its current form, under certain assignment scheduling, AssignmentsManager > both miss completion notifications, or prematurely trigger them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16365) AssignmentsManager mismanages completion notifications
[ https://issues.apache.org/jira/browse/KAFKA-16365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim updated KAFKA-16365: -- Component/s: jbod > AssignmentsManager mismanages completion notifications > -- > > Key: KAFKA-16365 > URL: https://issues.apache.org/jira/browse/KAFKA-16365 > Project: Kafka > Issue Type: Sub-task > Components: jbod >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > > When moving replicas between directories in the same broker, future replica > promotion hinges on acknowledgment from the controller of a change in the > directory assignment. > > ReplicaAlterLogDirsThread relies on AssignmentsManager for a completion > notification of the directory assignment change. > > In its current form, under certain assignment scheduling, AssignmentsManager > both miss completion notifications, or prematurely trigger them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
OmniaGM commented on code in PR #15335: URL: https://github.com/apache/kafka/pull/15335#discussion_r1521445120 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -355,10 +355,11 @@ class LogManager(logDirs: Seq[File], } else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) { addStrayLog(topicPartition, log) warn(s"Loaded stray log: $logDir") -} else if (shouldBeStrayKraftLog(log)) { - // Mark the partition directories we're not supposed to have as stray. We have to do this - // during log load because topics may have been recreated with the same name while a disk - // was offline. +} else if (isStray(log.topicId, topicPartition)) { + // Opposite of Zookeeper mode deleted topic in KRAFT mode can be recreated while it's not fully deleted from broker. + // As a result of this broker in KRAFT mode with one offline directory has no way to detect to-be-deleted replica in an offline directory earlier. + // However, broker need to mark the partition directories as stray during log load because topics may have been + // recreated with the same name while a log directory was offline. Review Comment: updated with the suggestion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1521398482 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -243,33 +243,20 @@ public MemoryRecords build() { /** * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. * - * If the log append time is used, the offset will be the last offset unless no compression is used and Review Comment: Updated. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve logging in AssignmentsManager [kafka]
soarez commented on PR #15522: URL: https://github.com/apache/kafka/pull/15522#issuecomment-1991528082 @showuon PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16365: AssignmentsManager callback handling issues [kafka]
soarez commented on PR #15521: URL: https://github.com/apache/kafka/pull/15521#issuecomment-1991526197 @showuon @OmniaGM @gaurav-narula could you have a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test [kafka]
nikramakrishnan commented on PR #15523: URL: https://github.com/apache/kafka/pull/15523#issuecomment-1991480460 Thanks for the change, @kamalcph! Just for my knowledge, why is there a possibility that the Shutdownable thread never ran in the CI for this test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]
johnnychhsu commented on code in PR #15476: URL: https://github.com/apache/kafka/pull/15476#discussion_r1521314724 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -293,11 +293,11 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; -offsetOfMaxTimestamp = initialOffset; -} - -if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { -offsetOfMaxTimestamp = offsetCounter.value - 1; +if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { +offsetOfMaxTimestamp = offsetCounter.value - 1; Review Comment: thanks for the comment! this is the same behaviour before the previous change affected the results. It differentiated the behaviour based on the magic value. I think the reason is because > /** > A record batch is a container for records. In old versions of the record format (versions 0 and 1), > a batch consisted always of a single record if no compression was enabled, but could contain > many records otherwise. Newer versions (magic versions 2 and above) will generally contain many records > regardless of compression. >*/ > public interface RecordBatch extends Iterable So version before magic_value=2 doesn't matter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]
johnnychhsu commented on code in PR #15476: URL: https://github.com/apache/kafka/pull/15476#discussion_r1521308393 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -293,11 +293,11 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; -offsetOfMaxTimestamp = initialOffset; -} - -if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { -offsetOfMaxTimestamp = offsetCounter.value - 1; +if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { +offsetOfMaxTimestamp = offsetCounter.value - 1; +} else { +offsetOfMaxTimestamp = initialOffset; Review Comment: thanks for the comment! yes the naming is confusing, although it does represent the latest offset but it looks like it's only the offset of one batch, instead of the record. Let me address this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test [kafka]
kamalcph commented on PR #15523: URL: https://github.com/apache/kafka/pull/15523#issuecomment-1991432854 @divijvaidya @showuon Would you please review this minor change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test [kafka]
kamalcph opened a new pull request, #15523: URL: https://github.com/apache/kafka/pull/15523 It is possible that the Shutdownable thread never ran in the CI so the test is failing ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15206) Flaky test RemoteIndexCacheTest.testClose()
[ https://issues.apache.org/jira/browse/KAFKA-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash reassigned KAFKA-15206: Assignee: Kamal Chandraprakash (was: Lan Ding) > Flaky test RemoteIndexCacheTest.testClose() > --- > > Key: KAFKA-15206 > URL: https://issues.apache.org/jira/browse/KAFKA-15206 > Project: Kafka > Issue Type: Test >Reporter: Divij Vaidya >Assignee: Kamal Chandraprakash >Priority: Minor > Labels: flaky-test > Fix For: 3.8.0 > > > Test fails 2% of the time. > [https://ge.apache.org/scans/tests?search.timeZoneId=Europe/Berlin=kafka.log.remote.RemoteIndexCacheTest=testClose()] > > This test should be modified to test > assertTrue(cache.cleanerThread.isShutdownComplete) in a > TestUtils.waitUntilTrue condition which will catch the InterruptedException > and exit successfully on it. -- This message was sent by Atlassian Jira (v8.20.10#820010)