Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-12 Thread via GitHub


sjhajharia commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1522533304


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -42,6 +42,11 @@ object AddPartitionsToTxnManager {
   val VerificationTimeMsMetricName = "VerificationTimeMs"
 }
 
+object OperationExpected extends Enumeration {
+  type operation = Value
+  val defaultOperation, latestProduceVersion, addPartitionWithoutVerification 
= Value

Review Comment:
   ack
   updated the code to use `sealed trait`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-12 Thread via GitHub


showuon commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1522451780


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -293,11 +293,11 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
 maxTimestamp = now;
-offsetOfMaxTimestamp = initialOffset;
-}
-
-if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
-offsetOfMaxTimestamp = offsetCounter.value - 1;
+if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
+offsetOfMaxTimestamp = offsetCounter.value - 1;

Review Comment:
   > I prefer the consistent behavior. Hence, it seems to me that adding 
document to OffsetSpec.MaxTimestampSpec is enough here. The use cases which 
depends on previous behavior (last offset) should be re-written for this bug 
fix ( and behavior change).
   
   +1 from me. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DO NOT MERGE] KAFKA-14419: limit time spent processing during ongoing rebalance and delay followup rebalance trigger [kafka]

2024-03-12 Thread via GitHub


github-actions[bot] commented on PR #15009:
URL: https://github.com/apache/kafka/pull/15009#issuecomment-1993330968

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-12 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1522447288


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -242,34 +242,23 @@ public MemoryRecords build() {
 
 /**
  * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
+ * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
  *
- * If the log append time is used, the offset will be the last offset 
unless no compression is used and
- * the message format version is 0 or 1, in which case, it will be the 
first offset.
+ * If the log append time is used, the offset will be the first offset of 
the record.
  *
- * If create time is used, the offset will be the last offset unless no 
compression is used and the message
- * format version is 0 or 1, in which case, it will be the offset of the 
record with the max timestamp.
+ * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
+ *
+ * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
  *
  * @return The max timestamp and its offset
  */
 public RecordsInfo info() {
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = baseOffset;
-return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp);
-} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
-return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
+return new RecordsInfo(logAppendTime, baseOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
+// If it's MAGIC_VALUE_V0, the value will be the default value: 
[-1, -1]
+return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);

Review Comment:
   Currently, when in `assignOffsetsNonCompressed` or 
`validateMessagesAndAssignOffsetsCompressed`, we'll always return `[-1, -1]` 
for maxTimestamp and offset if it's `MAGIC_VALUE_V0` 
[here](https://github.com/apache/kafka/pull/15474/files#diff-74aba980d40ab3c5c6fd66d7a27ffb4515181130e475b589834538d05aa408b9L263-L264).
 But in the case of re-build the records, we make it as `[-1, lastOffset]`, 
which is inconsistent. Fixing it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-12 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1522447288


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -242,34 +242,23 @@ public MemoryRecords build() {
 
 /**
  * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
+ * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
  *
- * If the log append time is used, the offset will be the last offset 
unless no compression is used and
- * the message format version is 0 or 1, in which case, it will be the 
first offset.
+ * If the log append time is used, the offset will be the first offset of 
the record.
  *
- * If create time is used, the offset will be the last offset unless no 
compression is used and the message
- * format version is 0 or 1, in which case, it will be the offset of the 
record with the max timestamp.
+ * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
+ *
+ * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
  *
  * @return The max timestamp and its offset
  */
 public RecordsInfo info() {
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = baseOffset;
-return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp);
-} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
-return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
+return new RecordsInfo(logAppendTime, baseOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
+// If it's MAGIC_VALUE_V0, the value will be the default value: 
[-1, -1]
+return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);

Review Comment:
   Currently, when in `assignOffsetsNonCompressed` or 
`validateMessagesAndAssignOffsetsCompressed`, we'll always return `[-1, -1]` 
for maxTimestamp and offset if it's `MAGIC_VALUE_V0` 
[here](https://github.com/apache/kafka/pull/15474/files#diff-74aba980d40ab3c5c6fd66d7a27ffb4515181130e475b589834538d05aa408b9L263-L264).
 But in the case of re-build the records, we make the it as `[-1, lastOffset]`, 
which is inconsistent. Fixing it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-12 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1522447288


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -242,34 +242,23 @@ public MemoryRecords build() {
 
 /**
  * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
+ * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
  *
- * If the log append time is used, the offset will be the last offset 
unless no compression is used and
- * the message format version is 0 or 1, in which case, it will be the 
first offset.
+ * If the log append time is used, the offset will be the first offset of 
the record.
  *
- * If create time is used, the offset will be the last offset unless no 
compression is used and the message
- * format version is 0 or 1, in which case, it will be the offset of the 
record with the max timestamp.
+ * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
+ *
+ * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
  *
  * @return The max timestamp and its offset
  */
 public RecordsInfo info() {
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = baseOffset;
-return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp);
-} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
-return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
+return new RecordsInfo(logAppendTime, baseOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
+// If it's MAGIC_VALUE_V0, the value will be the default value: 
[-1, -1]
+return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);

Review Comment:
   Currently, when in `assignOffsetsNonCompressed` or 
`validateMessagesAndAssignOffsetsCompressed`, we'll always return `[-1, -1]` 
for maxTimestamp and offset if it's `MAGIC_VALUE_V0`. But in the case of 
re-build the records, we make the it as `[-1, lastOffset]`, which is 
inconsistent. Fixing it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-03-12 Thread via GitHub


Phuc-Hong-Tran commented on PR #15188:
URL: https://github.com/apache/kafka/pull/15188#issuecomment-1993206506

   Regex validity check will be included in the next pull request, I'll try to 
get it done by this weekend.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15716: KRaft support in EpochDrivenReplicationProtocolAcceptanceTest [kafka]

2024-03-12 Thread via GitHub


highluck commented on code in PR #15295:
URL: https://github.com/apache/kafka/pull/15295#discussion_r1522384690


##
core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala:
##
@@ -53,17 +58,26 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends 
QuorumTestHarness wit
   val topic = "topic1"
   val msg = new Array[Byte](1000)
   val msgBigger = new Array[Byte](1)
-  var brokers: Seq[KafkaServer] = _
+  var brokers: Seq[KafkaBroker] = _
   var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
   var consumer: Consumer[Array[Byte], Array[Byte]] = _
 
+  private def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
+  private def listenerName: ListenerName = 
ListenerName.forSecurityProtocol(securityProtocol)
+
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
+if (TestInfoUtils.isKRaft(testInfo) && 
metadataVersion.isLessThan(IBP_3_3_IV0)) {

Review Comment:
   Oh thank you! It seems unnecessary. I'll edit it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15716: KRaft support in EpochDrivenReplicationProtocolAcceptanceTest [kafka]

2024-03-12 Thread via GitHub


highluck commented on PR #15295:
URL: https://github.com/apache/kafka/pull/15295#issuecomment-1993132575

   @mimaison
   thanks for review
   
   I felt like I needed to think a little more, so I thought it would be a good 
idea to work on a follow-up PR.
   This is because the test does not work with the broker structure currently 
in use.
   When the current work is finished, I will do a follow-up work!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-12 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1522334545


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;
 // 2. The member just joined or rejoined to group (epoch equals to 
zero);
 // 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
 response.setAssignment(createResponseAssignment(updatedMember));
 }
 
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Reconciles the current assignment of the member if needed.
+ *
+ * @param groupId   The group id.
+ * @param memberThe member to reconcile.
+ * @param currentPartitionEpoch The function returning the current epoch of
+ *  a given partition.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment  The target assignment.
+ * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+ *  is reported in the ConsumerGroupHeartbeat 
API and
+ *  it could be null if not provided.
+ * @param records   The list to accumulate any new records.
+ * @return The received member if no changes have been made; or a new
+ * member containing the new assignment.
+ */
+private ConsumerGroupMember maybeReconcile(
+String groupId,
+ConsumerGroupMember member,
+BiFunction currentPartitionEpoch,
+int targetAssignmentEpoch,
+Assignment targetAssignment,
+List 
ownedTopicPartitions,
+List records
+) {
+if (member.isReconciledTo(targetAssignmentEpoch)) {
+return member;
+}
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+.withCurrentPartitionEpoch(currentPartitionEpoch)
+.withOwnedTopicPartitions(ownedTopicPartitions)
+.build();
+
+if (!updatedMember.equals(member)) {
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+ + "assignedPartitions={} and revokedPartitions={}.",
+groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+scheduleConsumerGroupRebalanceTimeout(
+groupId,
+updatedMember.memberId(),
+updatedMember.memberEpoch(),
+updatedMember.rebalanceTimeoutMs()
+);
+} else {

Review Comment:
   I see. I think it is a little confusing for people trying to understand the 
algorithm, but I agree that it is important to have it apply and be used in a 
way that makes sense.
   
   So just so I get it straight -- we have a timeout only for the revocation of 
the partitions. If we hit the timeout, the member is fenced and I assume we can 
count those partitions as revoked. (Or we no longer have to wait for them to be 
revoked)
   
   At the point all partitions are revoked, we can try to assign new partitions 
to members. In this case, are we relying on the heartbeat to kick out members 
out if they aren't responding? And we expect if requests are going through the 
assignment will occur?
   
   Just trying to confirm the reason for the separate timeout here. Is it 
because revoking is more likely to fail even though the heartbeat still goes 
through, but not the same for assigning?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-12 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1522334545


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;
 // 2. The member just joined or rejoined to group (epoch equals to 
zero);
 // 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
 response.setAssignment(createResponseAssignment(updatedMember));
 }
 
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Reconciles the current assignment of the member if needed.
+ *
+ * @param groupId   The group id.
+ * @param memberThe member to reconcile.
+ * @param currentPartitionEpoch The function returning the current epoch of
+ *  a given partition.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment  The target assignment.
+ * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+ *  is reported in the ConsumerGroupHeartbeat 
API and
+ *  it could be null if not provided.
+ * @param records   The list to accumulate any new records.
+ * @return The received member if no changes have been made; or a new
+ * member containing the new assignment.
+ */
+private ConsumerGroupMember maybeReconcile(
+String groupId,
+ConsumerGroupMember member,
+BiFunction currentPartitionEpoch,
+int targetAssignmentEpoch,
+Assignment targetAssignment,
+List 
ownedTopicPartitions,
+List records
+) {
+if (member.isReconciledTo(targetAssignmentEpoch)) {
+return member;
+}
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+.withCurrentPartitionEpoch(currentPartitionEpoch)
+.withOwnedTopicPartitions(ownedTopicPartitions)
+.build();
+
+if (!updatedMember.equals(member)) {
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+ + "assignedPartitions={} and revokedPartitions={}.",
+groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+scheduleConsumerGroupRebalanceTimeout(
+groupId,
+updatedMember.memberId(),
+updatedMember.memberEpoch(),
+updatedMember.rebalanceTimeoutMs()
+);
+} else {

Review Comment:
   I see. I think it is a little confusing for people trying to understand the 
algorithm, but I agree that it is important to have it apply and be used in a 
way that makes sense.
   
   So just so I get it straight -- we have a timeout only for the revocation of 
the partitions. If we hit the timeout, the member is fenced and I assume we can 
count those partitions as revoked. (Or we no longer have to wait for them to be 
revoked)
   
   At the point all partitions are revoked, we can try to assign new partitions 
to members. In this case, are we relying on the heartbeat to kick out members 
if they don't ack the new assignments?
   
   Just trying to confirm the reason for the separate timeout here. Is it 
because revoking is more likely to fail even though the heartbeat still goes 
through, but not the same for assigning?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16354) FinalizedFeatureChangeListenerTest should use mocked latches

2024-03-12 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825871#comment-17825871
 ] 

PoAn Yang commented on KAFKA-16354:
---

Hi [~gharris1727], it looks like not both 
`testCacheUpdateWaitFailsForUnreachableVersion` and 
`testInitFailureDueToFeatureIncompatibility` wait in CountDownLatch#await 
function.

`testCacheUpdateWaitFailsForUnreachableVersion` waits in 
ZkMetadataCache#waitUntilFeatureEpochOrThrow. It gets System.nanoTime directly. 
We can't use a mocked latch for it.

`testInitFailureDueToFeatureIncompatibility`waits with CountDownLatch. However, 
we may need to update FinalizedFeatureChangeListener#initOrThrow input to give 
a mocked latch. Do we want to do that? Thank you.

> FinalizedFeatureChangeListenerTest should use mocked latches
> 
>
> Key: KAFKA-16354
> URL: https://issues.apache.org/jira/browse/KAFKA-16354
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Greg Harris
>Priority: Trivial
>  Labels: newbie
>
> testCacheUpdateWaitFailsForUnreachableVersion takes 30 seconds, and 
> testInitFailureDueToFeatureIncompatibility takes 5 seconds. This appears to 
> be caused by FeatureCacheUpdater#awaitUpdateOrThrow waiting for a real 
> CountDownLatch with a real-time timeout.
> Instead, a mocked latch should be used to exit the await immediately.
> This should be done both for CPU-independence, and for test execution speed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-12 Thread via GitHub


chia7712 commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1522318718


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -293,11 +293,11 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
 maxTimestamp = now;
-offsetOfMaxTimestamp = initialOffset;
-}
-
-if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
-offsetOfMaxTimestamp = offsetCounter.value - 1;
+if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
+offsetOfMaxTimestamp = offsetCounter.value - 1;

Review Comment:
   It seems KIP-734 does not define the case we are discussing - return the 
initial/last offset when there are multi-records having same "max timestamp".  
   
   So how we deal with it? Is it a good practice to discuss compatibility? Or 
we deal with it as "bug fix" and we add enough document to explain the "correct 
behavior" ?
   
   I prefer the consistent behavior. Hence, it seems to me that adding document 
to `OffsetSpec.MaxTimestampSpec` is enough here. The use cases which depends on 
previous behavior (last offset) should be re-written for this bug fix ( and 
behavior change).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-12 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1522316891


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/MemberState.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The various states that a member can be in. For their definition,
+ * refer to the documentation of {{@link CurrentAssignmentBuilder}}.

Review Comment:
   Ok -- really minor nit, but I guess it is less so in the documentation of 
the file and more in the comments of the file. Not sure if we care to make that 
distinction, so I will leave it to you to decide.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Make string from array [kafka]

2024-03-12 Thread via GitHub


jolshan commented on code in PR #15526:
URL: https://github.com/apache/kafka/pull/15526#discussion_r1522315310


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1564,7 +1564,7 @@ object LogManager {
   Option(newTopicsImage.getPartition(topicId, partitionId)) match {
 case Some(partition) =>
   if (!partition.replicas.contains(brokerId)) {
-info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas} " +
+info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas.mkString("[", ", ", "]")} " +

Review Comment:
   Ok. I guess our suggestions are the same 路‍♀️ . It was a bit less readable 
to me which is why I was confused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: simplify consumer logic [kafka]

2024-03-12 Thread via GitHub


mjsax commented on PR #15519:
URL: https://github.com/apache/kafka/pull/15519#issuecomment-1992854817

   PR update does not show up right now. Github Status page:
   > Update - We're continuing to investigate an elevated number of pull 
requests that are out of sync on page load.
   Mar 13, 2024 - 00:12 UTC
   
   Just in case you are wondering.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Make string from array [kafka]

2024-03-12 Thread via GitHub


jsancio commented on code in PR #15526:
URL: https://github.com/apache/kafka/pull/15526#discussion_r1522311967


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1564,7 +1564,7 @@ object LogManager {
   Option(newTopicsImage.getPartition(topicId, partitionId)) match {
 case Some(partition) =>
   if (!partition.replicas.contains(brokerId)) {
-info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas} " +
+info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas.mkString("[", ", ", "]")} " +

Review Comment:
   Why? the `"["` is the starting string, `", "` is the separator and `"]"` is 
the ending string. `Array(1,2).mkString("[", ", ", "]")` returns `"[1, 2]"`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: simplify consumer logic [kafka]

2024-03-12 Thread via GitHub


mjsax commented on PR #15519:
URL: https://github.com/apache/kafka/pull/15519#issuecomment-1992841168

   Thank for all the input! -- Updated the PR accordinly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-12 Thread via GitHub


mjsax commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1522304963


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   > Will try to make a unit-test for this.
   
   Thanks! That's awesome!
   
   > The documentation says that the ordering of the KeyValueIterator is NOT 
guaranteed.
   
   But this is internal, right? This helper store is not plugable, so I think 
we could rely on ordering? -- We do store the data (ie, key) as 
``, so we should be able to break the loop 
when we get the first timestamp which is too large (independent if it's a left 
or right record).
   
   cf `TimestampedKeyAndJoinSideSerializer.java`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-12 Thread via GitHub


mjsax commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1522304963


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   > Will try to make a unit-test for this.
   
   Thanks! That's awesome!
   
   > The documentation says that the ordering of the KeyValueIterator is NOT 
guaranteed.
   
   But this is internal, right? This helper store is not plugable, so I think 
we could rely on ordering? -- We do store the data (ie, key) as 
``, so we should be able to break the loop 
when we get the first timestamp which is too large (independent if it's a left 
or right record).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Make string from array [kafka]

2024-03-12 Thread via GitHub


jolshan commented on code in PR #15526:
URL: https://github.com/apache/kafka/pull/15526#discussion_r1522294411


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1564,7 +1564,7 @@ object LogManager {
   Option(newTopicsImage.getPartition(topicId, partitionId)) match {
 case Some(partition) =>
   if (!partition.replicas.contains(brokerId)) {
-info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas} " +
+info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas.mkString("[", ", ", "]")} " +

Review Comment:
   Does it make sense to be
   `[${partition.replicas.mkString(",")}]`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-12 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1522294617


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -19,82 +19,230 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
+import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.utils.Utils
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Properties
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
 
   val topicName = "foo"
+  val topicNameWithCustomConfigs = "foo2"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
-createTopic(topicName, 1, 1.toShort)
-produceMessages()
+createTopicWithConfig(topicName, new Properties())
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch("gzip")
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this one batch test, it'll be the first offset 0
+verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testLatestOffset(quorum: String): Unit = {
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
-assertEquals(3, latestOffset.offset())
+  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+produceMessagesInSeparateBatch()
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this separate batch test, it'll be the last offset 2
+verifyListOffsets(topic = topicNameWithCustomConfigs, 2)

Review Comment:
   Good point! Let me improve it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Make string from array [kafka]

2024-03-12 Thread via GitHub


jolshan commented on code in PR #15526:
URL: https://github.com/apache/kafka/pull/15526#discussion_r1522294411


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1564,7 +1564,7 @@ object LogManager {
   Option(newTopicsImage.getPartition(topicId, partitionId)) match {
 case Some(partition) =>
   if (!partition.replicas.contains(brokerId)) {
-info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas} " +
+info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas.mkString("[", ", ", "]")} " +

Review Comment:
   Does it make sense to be
   [${{partition.replicas.mkString(",")}]



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1564,7 +1564,7 @@ object LogManager {
   Option(newTopicsImage.getPartition(topicId, partitionId)) match {
 case Some(partition) =>
   if (!partition.replicas.contains(brokerId)) {
-info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas} " +
+info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas.mkString("[", ", ", "]")} " +

Review Comment:
   Does it make sense to be
   `[${{partition.replicas.mkString(",")}]`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Make string from array [kafka]

2024-03-12 Thread via GitHub


jolshan commented on code in PR #15526:
URL: https://github.com/apache/kafka/pull/15526#discussion_r1522293491


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1564,7 +1564,7 @@ object LogManager {
   Option(newTopicsImage.getPartition(topicId, partitionId)) match {
 case Some(partition) =>
   if (!partition.replicas.contains(brokerId)) {
-info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas} " +
+info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas.mkString("[", ", ", "]")} " +

Review Comment:
   are we assuming 3 replicas here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR; Make string from array [kafka]

2024-03-12 Thread via GitHub


jsancio opened a new pull request, #15526:
URL: https://github.com/apache/kafka/pull/15526

   If toString is called on an array it returns the string representing the 
object reference.  Use mkString instead to print the content of the array.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]

2024-03-12 Thread via GitHub


hgeraldino commented on PR #15506:
URL: https://github.com/apache/kafka/pull/15506#issuecomment-1992692291

   Thanks @OmniaGM for your review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-12 Thread via GitHub


junrao commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1522196043


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -293,11 +293,11 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
 maxTimestamp = now;
-offsetOfMaxTimestamp = initialOffset;
-}
-
-if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
-offsetOfMaxTimestamp = offsetCounter.value - 1;
+if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
+offsetOfMaxTimestamp = offsetCounter.value - 1;

Review Comment:
   @johnnychhsu : Yes, that was the original code. But the issue is that that 
code was no longer correct when 
[KIP-734](https://cwiki.apache.org/confluence/display/KAFKA/KIP-734%3A+Improve+AdminClient.listOffsets+to+return+timestamp+and+offset+for+the+record+with+the+largest+timestamp)
 was added. Without KIP-734, it's ok to set offsetOfMaxTimestamp to the last 
offset for magic V2. However, with KIP-734, it's important to set 
offsetOfMaxTimestamp to the first offset for both magic V1 and V2. It seems 
that we missed that part when adding KIP-734. So, I think the logic should be 
if `timestampType == TimestampType.LOG_APPEND_TIME`, we always set 
`offsetOfMaxTimestamp` to `initialOffset`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]

2024-03-12 Thread via GitHub


hgeraldino commented on code in PR #15506:
URL: https://github.com/apache/kafka/pull/15506#discussion_r1522194636


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##
@@ -1215,17 +1195,13 @@ public void testSuppressCloseErrors() {
 
 workerTask.initialize(TASK_CONFIG);
 workerTask.initializeAndStart();
-try {
-workerTask.execute();
-fail("workerTask.execute should have thrown an exception");
-} catch (ConnectException e) {
-assertSame("Exception from put should be the cause", putException, 
e.getCause());
-assertTrue("Exception from close should be suppressed", 
e.getSuppressed().length > 0);
-assertSame(closeException, e.getSuppressed()[0]);
-}
+
+Throwable thrownException = assertThrows(ConnectException.class, () -> 
workerTask.execute());
+assertSame("Exception from put should be the cause", putException, 
thrownException.getCause());

Review Comment:
   This is a remnant of the original implementation. As you correctly guessed, 
there is no difference in this case between using `assertEquals` and 
`assertSame`: neither the `RuntimeException` class nor any of its parents 
override the `equals()` method, so they both are effectively checking for 
reference equality (as implemented by the base `Object` class)
   
   I'm happy to change these to `assertEquals` (this is the only place where 
this is used)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Tweak streams config doc [kafka]

2024-03-12 Thread via GitHub


mjsax merged PR #15518:
URL: https://github.com/apache/kafka/pull/15518


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-12 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1522184075


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+
+public class KStreamKStreamWindowCloseTest {
+
+private static final String LEFT = "left";
+private static final String RIGHT = "right";
+private final static Properties PROPS = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private static final Consumed CONSUMED = 
Consumed.with(Serdes.Integer(), Serdes.String());
+private static final JoinWindows WINDOW = 
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5));
+
+static List streams() {
+return Arrays.asList(
+innerJoin(),
+leftJoin(),
+outerJoin()
+);
+}
+
+private static Arguments innerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).join(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments leftJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).leftJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments outerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).outerJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
supplier = new MockApiProcessorSupplier<>();
+stream.process(supplier);
+return Arguments.of(builder.build(PROPS), supplier);
+}
+
+@ParameterizedTest
+@MethodSource("streams")
+public void recordsArrivingPostWindowCloseShouldBeDropped(
+final Topology topology,
+final MockApiProcessorSupplier 

Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-12 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1522184075


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+
+public class KStreamKStreamWindowCloseTest {
+
+private static final String LEFT = "left";
+private static final String RIGHT = "right";
+private final static Properties PROPS = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private static final Consumed CONSUMED = 
Consumed.with(Serdes.Integer(), Serdes.String());
+private static final JoinWindows WINDOW = 
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5));
+
+static List streams() {
+return Arrays.asList(
+innerJoin(),
+leftJoin(),
+outerJoin()
+);
+}
+
+private static Arguments innerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).join(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments leftJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).leftJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments outerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).outerJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
supplier = new MockApiProcessorSupplier<>();
+stream.process(supplier);
+return Arguments.of(builder.build(PROPS), supplier);
+}
+
+@ParameterizedTest
+@MethodSource("streams")
+public void recordsArrivingPostWindowCloseShouldBeDropped(
+final Topology topology,
+final MockApiProcessorSupplier 

Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-12 Thread via GitHub


junrao commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1522164475


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -19,82 +19,230 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
+import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.utils.Utils
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.Properties
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
 
   val topicName = "foo"
+  val topicNameWithCustomConfigs = "foo2"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
-createTopic(topicName, 1, 1.toShort)
-produceMessages()
+createTopicWithConfig(topicName, new Properties())
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
+produceMessagesInOneBatch("gzip")
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this one batch test, it'll be the first offset 0
+verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testLatestOffset(quorum: String): Unit = {
-val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
-assertEquals(3, latestOffset.offset())
+  def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
+produceMessagesInSeparateBatch()
+verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this separate batch test, it'll be the last offset 2
+verifyListOffsets(topic = topicNameWithCustomConfigs, 2)

Review Comment:
   Do we guarantee that the server time has advanced after appending each 
batch? Ditto for `testThreeRecordsInSeparateBatchWithMessageConversion`



##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +53,24 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before() {
 cluster.config().serverProperties().put("auto.create.topics.enable", 
false);
 
cluster.config().serverProperties().put("offsets.topic.replication.factor", 
"1");
 
cluster.config().serverProperties().put("offsets.topic.num.partitions", 
String.valueOf(offsetTopicPartitionCount));
+}
 
+public void setUp() {

Review Comment:
   Ok. Could we make this setUp() private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the

Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-12 Thread via GitHub


nizhikov commented on PR #14471:
URL: https://github.com/apache/kafka/pull/14471#issuecomment-1992664069

   @chia7712 
   
   I ran system tests for command, also.
   System tests passed.
   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.11.4
   session_id:   2024-03-12--004
   run time: 5 minutes 22.467 seconds
   tests run:12
   passed:   12
   flaky:0
   failed:   0
   ignored:  0
   

   test_id:
kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_describe_consumer_group.security_protocol=PLAINTEXT.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
   status: PASS
   run time:   26.101 seconds
   

   test_id:
kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_describe_consumer_group.security_protocol=PLAINTEXT.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True
   status: PASS
   run time:   25.893 seconds
   

   test_id:
kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_describe_consumer_group.security_protocol=PLAINTEXT.metadata_quorum=ZK.use_new_coordinator=False
   status: PASS
   run time:   21.294 seconds
   

   test_id:
kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_describe_consumer_group.security_protocol=SSL.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
   status: PASS
   run time:   31.293 seconds
   

   test_id:
kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_describe_consumer_group.security_protocol=SSL.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True
   status: PASS
   run time:   31.091 seconds
   

   test_id:
kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_describe_consumer_group.security_protocol=SSL.metadata_quorum=ZK.use_new_coordinator=False
   status: PASS
   run time:   25.902 seconds
   

   test_id:
kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_list_consumer_groups.security_protocol=PLAINTEXT.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
   status: PASS
   run time:   25.983 seconds
   

   test_id:
kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_list_consumer_groups.security_protocol=PLAINTEXT.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True
   status: PASS
   run time:   26.154 seconds
   

   test_id:
kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_list_consumer_groups.security_protocol=PLAINTEXT.metadata_quorum=ZK.use_new_coordinator=False
   status: PASS
   run time:   20.705 seconds
   

   test_id:
kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_list_consumer_groups.security_protocol=SSL.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
   status: PASS
   run time:   30.682 seconds
   

   test_id:
kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_list_consumer_groups.security_protocol=SSL.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True
   status: PASS
   run time:   30.673 seconds
   

   test_id:
kafkatest.tests.core.consumer_group_command_test.ConsumerGroupCommandTest.test_list_consumer_groups.security_protocol=SSL.metadata_quorum=ZK.use_new_coordinator=False
   status: PASS
   run time:   26.424 seconds
   

   nizhikov@v10792:~/kafka$ git status
   On branch KAFKA-14589
   Your branch is up to date with 'origin/KAFKA-14589'.
   
   nothing to commit, working tree clean
   nizhikov@v10792:~/kafka$ git remote -v
   origin   https://github.com/nizhikov/kafka.git (fetch)
   origin   https://github.com/nizhikov/kafka.git (push)
   ```


-- 
This is an automated message from the Apache Git Service.
To 

Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-12 Thread via GitHub


nizhikov commented on code in PR #14471:
URL: https://github.com/apache/kafka/pull/14471#discussion_r1522059985


##
build.gradle:
##
@@ -1999,10 +1999,14 @@ project(':tools') {
 implementation project(':clients')
 implementation project(':storage')
 implementation project(':server-common')
+implementation project(':server')

Review Comment:
   Great catch. Thanks!
   It seems this dependency not required. Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective

2024-03-12 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825794#comment-17825794
 ] 

Ismael Juma commented on KAFKA-16359:
-

[~norrisjeremy] it's clearly a bug - there is no debate about that. But it's 
good to confirm the impact and whether there is a workaround. That will 
influence the urgency for the new release (i.e. does it have to be immediate or 
can it wait a week or two so that other important issues are fixed as well). 
Thanks for confirming the specifics.

> kafka-clients-3.7.0.jar published to Maven Central is defective
> ---
>
> Key: KAFKA-16359
> URL: https://issues.apache.org/jira/browse/KAFKA-16359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Jeremy Norris
>Assignee: Apoorv Mittal
>Priority: Critical
>
> The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is 
> defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} 
> element:
> {code}
> Manifest-Version: 1.0
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
> .5.jar slf4j-api-1.7.36.jar
> {code}
> This bogus {{Class-Path}} element leads to compiler warnings for projects 
> that utilize it as a dependency:
> {code}
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": 
> no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> {code}
> Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published 
> without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or 
> a new release should be published that corrects this defect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-12 Thread via GitHub


chia7712 commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1522051894


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -243,33 +243,20 @@ public MemoryRecords build() {
 /**
  * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
  *
- * If the log append time is used, the offset will be the last offset 
unless no compression is used and

Review Comment:
   Could you please update `OffsetSpec` also? 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java#L65



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Ctr end offsets shoud not throw [kafka]

2024-03-12 Thread via GitHub


philipnee opened a new pull request, #15525:
URL: https://github.com/apache/kafka/pull/15525

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective

2024-03-12 Thread Apoorv Mittal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825772#comment-17825772
 ] 

Apoorv Mittal commented on KAFKA-16359:
---

Though by the time fix and release happens, below workaround might suppress the 
warning for now (not a good practice but a workaround for now):
{code:java}
        org.apache.maven.plugins
        maven-compiler-plugin
        
          
          true
          true
          true
          
            ...
...
            
            -Xlint:-path
...
   {code}
Similar shadow jar issue encountered by launchdarkly and others, reference 
https://github.com/launchdarkly/java-server-sdk/issues/240

> kafka-clients-3.7.0.jar published to Maven Central is defective
> ---
>
> Key: KAFKA-16359
> URL: https://issues.apache.org/jira/browse/KAFKA-16359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Jeremy Norris
>Assignee: Apoorv Mittal
>Priority: Critical
>
> The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is 
> defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} 
> element:
> {code}
> Manifest-Version: 1.0
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
> .5.jar slf4j-api-1.7.36.jar
> {code}
> This bogus {{Class-Path}} element leads to compiler warnings for projects 
> that utilize it as a dependency:
> {code}
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": 
> no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> {code}
> Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published 
> without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or 
> a new release should be published that corrects this defect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective

2024-03-12 Thread Jeremy Norris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825770#comment-17825770
 ] 

Jeremy Norris commented on KAFKA-16359:
---

It's broken because users that attempt to utilize {{kafka-clients-3.7.0.jar}} 
as a dependency in their projects will generate spurious compiler warnings for 
something that isn't a defect in their code.
And if they also utilize the {{-Werror}} compiler option to promote compiler 
warnings into errors (as some users are want to do if they enforce cleanliness 
in their code), their projects & CI pipelines will fail.
Bottom line: users should not have to adjust compiler settings in order to 
consume the artifact.

> kafka-clients-3.7.0.jar published to Maven Central is defective
> ---
>
> Key: KAFKA-16359
> URL: https://issues.apache.org/jira/browse/KAFKA-16359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Jeremy Norris
>Assignee: Apoorv Mittal
>Priority: Critical
>
> The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is 
> defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} 
> element:
> {code}
> Manifest-Version: 1.0
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
> .5.jar slf4j-api-1.7.36.jar
> {code}
> This bogus {{Class-Path}} element leads to compiler warnings for projects 
> that utilize it as a dependency:
> {code}
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": 
> no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> {code}
> Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published 
> without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or 
> a new release should be published that corrects this defect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective

2024-03-12 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825767#comment-17825767
 ] 

Ismael Juma commented on KAFKA-16359:
-

The original message said there were warnings. But the latest says the 
artifacts are broken. Can you clarify the impact please?

> kafka-clients-3.7.0.jar published to Maven Central is defective
> ---
>
> Key: KAFKA-16359
> URL: https://issues.apache.org/jira/browse/KAFKA-16359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Jeremy Norris
>Assignee: Apoorv Mittal
>Priority: Critical
>
> The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is 
> defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} 
> element:
> {code}
> Manifest-Version: 1.0
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
> .5.jar slf4j-api-1.7.36.jar
> {code}
> This bogus {{Class-Path}} element leads to compiler warnings for projects 
> that utilize it as a dependency:
> {code}
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": 
> no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> {code}
> Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published 
> without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or 
> a new release should be published that corrects this defect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-12 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1521977816


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -42,6 +42,11 @@ object AddPartitionsToTxnManager {
   val VerificationTimeMsMetricName = "VerificationTimeMs"
 }
 
+object OperationExpected extends Enumeration {
+  type operation = Value
+  val defaultOperation, latestProduceVersion, addPartitionWithoutVerification 
= Value

Review Comment:
   I also think in scala we typically use sealed traits and case objects. 
   
   See 
https://github.com/apache/kafka/blob/58ddd693e69599b177d09c2e384f31e7f5e11171/core/src/main/scala/kafka/controller/KafkaController.scala#L59



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-12 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1521974720


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -42,6 +42,11 @@ object AddPartitionsToTxnManager {
   val VerificationTimeMsMetricName = "VerificationTimeMs"
 }
 
+object OperationExpected extends Enumeration {
+  type operation = Value
+  val defaultOperation, latestProduceVersion, addPartitionWithoutVerification 
= Value

Review Comment:
   I think we could have something like `default`, `genericError`, and 
`addPartition`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16352) Transaction may get get stuck in PrepareCommit or PrepareAbort state

2024-03-12 Thread Artem Livshits (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825766#comment-17825766
 ] 

Artem Livshits commented on KAFKA-16352:


that's correct: the transaction is fully completed, but just cannot transition 
into complete state because internal state is broken by a race condition

> Transaction may get get stuck in PrepareCommit or PrepareAbort state
> 
>
> Key: KAFKA-16352
> URL: https://issues.apache.org/jira/browse/KAFKA-16352
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Artem Livshits
>Assignee: Artem Livshits
>Priority: Major
>
> A transaction took a long time to complete, trying to restart a producer 
> would lead to CONCURRENT_TRANSACTION errors.  Investigation has shown that 
> the transaction was stuck in PrepareCommit for a few days:
> (current time when the investigation happened: Feb 27 2024), transaction 
> state:
> {{Type   |Name                  |Value}}
> {{-}}
> {{ref    |transactionalId       |xxx-yyy}}
> {{long   |producerId            |299364}}
> {{ref    |state                 |kafka.coordinator.transaction.PrepareCommit$ 
> @ 0x44fe22760}}
> {{long   |txnStartTimestamp     |1708619624810  Thu Feb 22 2024 16:33:44.810 
> GMT+}}
> {{long   |txnLastUpdateTimestamp|1708619625335  Thu Feb 22 2024 16:33:45.335 
> GMT+}}
> {{-}}
> The partition list was empty and transactionsWithPendingMarkers didn't 
> contain the reference to the transactional state.  In the log there were the 
> following relevant messages:
> {{22 Feb 2024 @ 16:33:45.623 UTC [Transaction State Manager 1]: Completed 
> loading transaction metadata from __transaction_state-3 for coordinator epoch 
> 611}}
> (this is the partition that contains the transactional id).  After the data 
> is loaded, it sends out markers and etc.
> Then there is this message:
> {{22 Feb 2024 @ 16:33:45.696 UTC [Transaction Marker Request Completion 
> Handler 4]: Transaction coordinator epoch for xxx-yyy has changed from 610 to 
> 611; cancel sending transaction markers TxnMarkerEntry{producerId=299364, 
> producerEpoch=1005, coordinatorEpoch=610, result=COMMIT, 
> partitions=[foo-bar]} to the brokers}}
> this message is logged just before the state is removed 
> transactionsWithPendingMarkers, but the state apparently contained the entry 
> that was created by the load operation.  So the sequence of events probably 
> looked like the following:
>  # partition load completed
>  # commit markers were sent for transactional id xxx-yyy; entry in 
> transactionsWithPendingMarkers was created
>  # zombie reply from the previous epoch completed, removed entry from 
> transactionsWithPendingMarkers
>  # commit markers properly completed, but couldn't transition to 
> CommitComplete state because transactionsWithPendingMarkers didn't have the 
> proper entry, so it got stuck there until the broker was restarted
> Looking at the code there are a few cases that could lead to similar race 
> conditions.  The fix is to keep track of the PendingCompleteTxn value that 
> was used when sending the marker, so that we can only remove the state that 
> was created when the marker was sent and not accidentally remove the state 
> someone else created.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]

2024-03-12 Thread via GitHub


artemlivshits commented on code in PR #15524:
URL: https://github.com/apache/kafka/pull/15524#discussion_r1521968243


##
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##
@@ -354,41 +366,42 @@ class TransactionMarkerChannelManager(
   txnLogAppend.newMetadata, appendCallback, _ == 
Errors.COORDINATOR_NOT_AVAILABLE, RequestLocal.NoCaching)
   }
 
-  def addTxnMarkersToBrokerQueue(transactionalId: String,
- producerId: Long,
+  def addTxnMarkersToBrokerQueue(producerId: Long,
  producerEpoch: Short,
  result: TransactionResult,
- coordinatorEpoch: Int,
+ pendingCompleteTxn: PendingCompleteTxn,

Review Comment:
   pendingCompleteTxn has transactional id and coordinator epoch, so we don't 
need to pass them explicitly.



##
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##
@@ -109,23 +109,30 @@ object TransactionMarkerChannelManager {
 
 }
 
-class TxnMarkerQueue(@volatile var destination: Node) {
+class TxnMarkerQueue(@volatile var destination: Node) extends Logging {
 
   // keep track of the requests per txn topic partition so we can easily clear 
the queue
   // during partition emigration
-  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[TxnIdAndMarkerEntry]]().asScala
+  private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, 
BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala

Review Comment:
   Now we keep track of the PendingCompleteTxn that was added in the 
transactionsWithPendingMarkers.



##
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala:
##
@@ -419,25 +432,34 @@ class TransactionMarkerChannelManager(
 
   def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = {
 
markersQueueForUnknownBroker.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach
 { queue =>
-  for (entry: TxnIdAndMarkerEntry <- queue.asScala)
-removeMarkersForTxnId(entry.txnId)
+  for (entry <- queue.asScala) {
+info(s"Removing $entry for txn partition $txnTopicPartitionId to 
destination broker -1")
+removeMarkersForTxn(entry.pendingCompleteTxn)
+  }
 }
 
-markersQueuePerBroker.foreach { case(_, brokerQueue) =>
+markersQueuePerBroker.foreach { case(brokerId, brokerQueue) =>
   
brokerQueue.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach { 
queue =>
-for (entry: TxnIdAndMarkerEntry <- queue.asScala)
-  removeMarkersForTxnId(entry.txnId)
+for (entry <- queue.asScala) {
+  info(s"Removing $entry for txn partition $txnTopicPartitionId to 
destination broker $brokerId")
+  removeMarkersForTxn(entry.pendingCompleteTxn)
+}
   }
 }
   }
 
-  def removeMarkersForTxnId(transactionalId: String): Unit = {
-transactionsWithPendingMarkers.remove(transactionalId)
+  def removeMarkersForTxn(pendingCompleteTxn: PendingCompleteTxn): Unit = {
+val transactionalId = pendingCompleteTxn.transactionalId
+val removed = transactionsWithPendingMarkers.remove(transactionalId, 
pendingCompleteTxn)

Review Comment:
   This the the core change -- use the original pendingCompleteTxn value to 
remove the entry.  The rest of the change is pretty much plumbing so that we 
can supply the correct pendingCompleteTxn.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16318 : add javafoc for kafka metric [kafka]

2024-03-12 Thread via GitHub


chia7712 commented on code in PR #15483:
URL: https://github.com/apache/kafka/pull/15483#discussion_r1521949713


##
clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java:
##
@@ -40,15 +48,28 @@ public KafkaMetric(Object lock, MetricName metricName, 
MetricValueProvider va
 this.time = time;
 }
 
+/**
+ * Get the configuration of this metric
+ * @return Return the config of this metric
+ */
 public MetricConfig config() {
 return this.config;

Review Comment:
   this is unrelate to this PR, but should we add lock to this method? It seems 
we assume the config can be read/write concurrently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-9690) MemoryLeak in JMX Reporter

2024-03-12 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-9690.
---
Resolution: Duplicate

close this due to https://issues.apache.org/jira/browse/KAFKA-9306

> MemoryLeak in JMX Reporter
> --
>
> Key: KAFKA-9690
> URL: https://issues.apache.org/jira/browse/KAFKA-9690
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.4.0
>Reporter: Kaare Nilsen
>Priority: Major
> Attachments: image-2020-03-10-12-37-49-259.png, 
> image-2020-03-10-12-44-11-688.png
>
>
> We use kafka in a streamin http application creating a new consumer for each 
> incoming requests. In version 2.4.0 we experience that the memory builds up 
> for each new consumer. After debugging the issue after a memory dump revealed 
> it was in the JMX subsystem we found that one of the JMX beans 
> (kafka.consumer) build up one metric consumer-metrics without releasing them 
> on closing the consumer.
> What we found is that the metricRemoval  
> {code:java}
> public void metricRemoval(KafkaMetric metric) {
> synchronized (LOCK) {
> MetricName metricName = metric.metricName();
> String mBeanName = getMBeanName(prefix, metricName);
> KafkaMbean mbean = removeAttribute(metric, mBeanName);
> if (mbean != null) {
> if (mbean.metrics.isEmpty()) {
> unregister(mbean);
> mbeans.remove(mBeanName);
> } else
> reregister(mbean);
> }
> }
> }
> {code}
> The check mbean.metrics.isEmpty() for this particular metric never yielded 
> true so the mbean was never removed. Thus building up the mbeans HashMap.
> The metrics that is not released are:
> {code:java}
> last-poll-seconds-ago
> poll-idle-ratio-avg")
> time-between-poll-avg
> time-between-poll-max
> {code}
> I have a workaround in my code now by having a modified JMXReporter in my pwn 
> project with the following close method
> {code:java}
> public void close() {
> synchronized (LOCK) {
> for (KafkaMbean mbean : this.mbeans.values()) {
> mbean.removeAttribute("last-poll-seconds-ago");
> mbean.removeAttribute("poll-idle-ratio-avg");
> mbean.removeAttribute("time-between-poll-avg");
> mbean.removeAttribute("time-between-poll-max");
> unregister(mbean);
> }
> }
> }
> {code}
> This will remove the attributes that are not cleaned up and prevent the 
> memory leakage, but I have not found the root casue.
> Another workaround is to use kafka client 2.3.1
>  
> this is how it looks in the jmx console after a couple of clients have 
> connected and disconnected. Here you can see that the one metric builds up 
> and the old ones have the four attributes that makes the unregister fail.
>  
> !image-2020-03-10-12-37-49-259.png!
>  
> dThis Is how it looks after a while in kafka client 2.3.1
> !image-2020-03-10-12-44-11-688.png!
> As you can see no leakage here.
> I suspect this pull request to be the one that have introduced the leak: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-517%3A+Add+consumer+metrics+to+observe+user+poll+behavior]
> https://issues.apache.org/jira/browse/KAFKA-8874



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean

2024-03-12 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-9504.
---
Resolution: Duplicate

close this due to https://issues.apache.org/jira/browse/KAFKA-9306

> Memory leak in KafkaMetrics registered to MBean
> ---
>
> Key: KAFKA-9504
> URL: https://issues.apache.org/jira/browse/KAFKA-9504
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
>Reporter: Andreas Holmén
>Priority: Major
>
> After close() called on a KafkaConsumer some registered MBeans are not 
> unregistered causing leak.
>  
>  
> {code:java}
> import static 
> org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
> import java.lang.management.ManagementFactory;
> import java.util.HashMap;
> import java.util.Map;
> import javax.management.MBeanServer;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.serialization.ByteArrayDeserializer;
> public class Leaker {
>  private static String bootstrapServers = "hostname:9092";
>  
>  public static void main(String[] args) throws InterruptedException {
>   MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
>   Map props = new HashMap<>();
>   props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
>  
>   int beans = mBeanServer.getMBeanCount();
>   for (int i = 0; i < 100; i++) {
>KafkaConsumer consumer = new KafkaConsumer<>(props, new 
> ByteArrayDeserializer(), new ByteArrayDeserializer());
>consumer.close();
>   }
>   int newBeans = mBeanServer.getMBeanCount();
>   System.out.println("\nbeans delta: " + (newBeans - beans));
>  }
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]

2024-03-12 Thread via GitHub


artemlivshits opened a new pull request, #15524:
URL: https://github.com/apache/kafka/pull/15524

   …tate
   
   Now the removal of entries from the transactionsWithPendingMarkers map 
checks the value and all pending marker operations keep the value along with 
the operation state.  This way, the pending marker operation can only delete 
the state it created and wouldn't accidentally delete the state from a 
different epoch (which could lead to "stuck" transactions).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-12 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521937262


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -210,12 +213,12 @@ public class MembershipManagerImpl implements 
MembershipManager {
 private final Map assignedTopicNamesCache;
 
 /**
- * Topic IDs and partitions received in the last target assignment. Items 
are added to this set
- * every time a target assignment is received. This is where the member 
collects the assignment
- * received from the broker, even though it may not be ready to fully 
reconcile due to missing
- * metadata.
+ * Topic IDs and partitions received in the last target assignment, 
together with its local epoch.
+ *
+ * This member reassigned every time a new assignment is received.

Review Comment:
   I don't quite get this sentence (was the intention just to start with 
`Reassigned every`... I guess...?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-12 Thread via GitHub


chia7712 commented on code in PR #14471:
URL: https://github.com/apache/kafka/pull/14471#discussion_r1521936219


##
build.gradle:
##
@@ -1999,10 +1999,14 @@ project(':tools') {
 implementation project(':clients')
 implementation project(':storage')
 implementation project(':server-common')
+implementation project(':server')

Review Comment:
   Is this dependency necessary? we don't expect tool module depends on core 
module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-12 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521933499


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -889,43 +914,36 @@ private void transitionToStale() {
  */
 void maybeReconcile() {
 if (targetAssignmentReconciled()) {
-log.debug("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
+log.trace("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
 "current assignment.");
 return;
 }
 if (reconciliationInProgress) {
-log.debug("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
+log.trace("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
 currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
 return;
 }
 
 // Find the subset of the target assignment that can be resolved to 
topic names, and trigger a metadata update
 // if some topic IDs are not resolvable.
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
+final LocalAssignment resolvedAssignment = new 
LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
 
-SortedSet ownedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
-ownedPartitions.addAll(subscriptions.assignedPartitions());
-
-// Keep copy of assigned TopicPartitions created from the 
TopicIdPartitions that are
-// being reconciled. Needed for interactions with the centralized 
subscription state that
-// does not support topic IDs yet, and for the callbacks.
-SortedSet assignedTopicPartitions = 
toTopicPartitionSet(assignedTopicIdPartitions);
-
-// Check same assignment. Based on topic names for now, until topic 
IDs are properly
-// supported in the centralized subscription state object. Note that 
this check is
-// required to make sure that reconciliation is not triggered if the 
assignment ready to
-// be reconciled is the same as the current one (even though the 
member may remain
-// in RECONCILING state if it has some unresolved assignments).
-boolean sameAssignmentReceived = 
assignedTopicPartitions.equals(ownedPartitions);
-
-if (sameAssignmentReceived) {
+if (resolvedAssignment.equals(currentAssignment)) {

Review Comment:
   This is short-circuiting the reconciliation if the same assignment is 
received (same epoch, same partitions). But we also need to consider the case 
that we get the same partitions assigned but in a different epoch. In that 
case, we should not carry on with the full reconciliation process (there is 
truly nothing to reconcile), but we should send an ack to the broker, so I 
would expect we need a similar short-circuit for `if sameAssignmentInDiffEpoch 
=> transitionToAck & return;`.
   
   It's mainly thinking about the case:
   - member owns t1-1 epoch 3
   - receives new assignment [t1-1, t2-1] epoch 4, stuck reconciling, ex. 
missing t2 metadata
   - receives new assignment [t1-1] epoch 5 (ex. broker realized t2 has been 
deleted)
   - member does not need to reconcile t1-1, but should send an ack to the 
broker with t1-1 that it received on a newer epoch
   Makes sense? Not sure if I may be missing something regarding the 
expectations
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-12 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521933499


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -889,43 +914,36 @@ private void transitionToStale() {
  */
 void maybeReconcile() {
 if (targetAssignmentReconciled()) {
-log.debug("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
+log.trace("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
 "current assignment.");
 return;
 }
 if (reconciliationInProgress) {
-log.debug("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
+log.trace("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
 currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
 return;
 }
 
 // Find the subset of the target assignment that can be resolved to 
topic names, and trigger a metadata update
 // if some topic IDs are not resolvable.
 SortedSet assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
+final LocalAssignment resolvedAssignment = new 
LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
 
-SortedSet ownedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
-ownedPartitions.addAll(subscriptions.assignedPartitions());
-
-// Keep copy of assigned TopicPartitions created from the 
TopicIdPartitions that are
-// being reconciled. Needed for interactions with the centralized 
subscription state that
-// does not support topic IDs yet, and for the callbacks.
-SortedSet assignedTopicPartitions = 
toTopicPartitionSet(assignedTopicIdPartitions);
-
-// Check same assignment. Based on topic names for now, until topic 
IDs are properly
-// supported in the centralized subscription state object. Note that 
this check is
-// required to make sure that reconciliation is not triggered if the 
assignment ready to
-// be reconciled is the same as the current one (even though the 
member may remain
-// in RECONCILING state if it has some unresolved assignments).
-boolean sameAssignmentReceived = 
assignedTopicPartitions.equals(ownedPartitions);
-
-if (sameAssignmentReceived) {
+if (resolvedAssignment.equals(currentAssignment)) {

Review Comment:
   This is short-circuiting the reconciliation if the same assignment is 
received (same epoch, same partitions). But we also need to consider also the 
case that we get the same partitions assigned but in a different epoch. In that 
case, we should not carry on with the full reconciliation process (there is 
truly nothing to reconcile), but we should send an ack to the broker, so I 
would expect we need a similar short-circuit for `if sameAssignmentInDiffEpoch 
=> transitionToAck & return;`.
   
   It's mainly thinking about the case:
   - member owns t1-1 epoch 3
   - receives new assignment [t1-1, t2-1] epoch 4, stuck reconciling, ex. 
missing t2 metadata
   - receives new assignment [t1-1] epoch 5 (ex. broker realized t2 has been 
deleted)
   - member does not need to reconcile t1-1, but should send an ack to the 
broker with t1-1 that it received on a newer epoch
   Makes sense? Not sure if I may be missing something regarding the 
expectations
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-12 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1521827156


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   
   Currently, in the while-loop of the emitNonJoinedOuterRecords() method we 
are iterating over ALL the left- and right-side outerJoinRecords that are 
available in the outerjoin-store until we meet the **break;**.
   
   The idea of the two outerJoinBreak flags was to keep track of when the 
window of the outerJoinRecords is not closed anymore, but this is only useful 
if the ordering of the KeyValueIterator is by timeStampedKey, and it is not:
   
   The documentation says that the ordering of the KeyValueIterator is NOT 
guaranteed.
   
   So, now I think we better can remove the outerJoinBreak flags and just check 
for each outerJoinRecord whether it belongs to a closed window or not, without 
any optimization. 
   If the window has closed we can emit a nullJoinedValue. If the window is not 
closed yet we can continue with the next outerJoinRecord.
   
   What do you think?  @mjsax @florin-akermann 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15250) ConsumerNetworkThread is running tight loop

2024-03-12 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15250:
--
Summary: ConsumerNetworkThread is running tight loop  (was: 
DefaultBackgroundThread is running tight loop)

> ConsumerNetworkThread is running tight loop
> ---
>
> Key: KAFKA-15250
> URL: https://issues.apache.org/jira/browse/KAFKA-15250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, events
> Fix For: 3.8.0
>
>
> The DefaultBackgroundThread is running tight loops and wasting CPU cycles.  I 
> think we need to reexamine the timeout pass to networkclientDelegate.poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-12 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1521780875


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1007,7 +1010,8 @@ class ReplicaManager(val config: KafkaConfig,
   transactionalId,
   producerId,
   producerEpoch,
-  generalizedCallback
+  generalizedCallback,
+  ApiKeys.PRODUCE.latestVersion

Review Comment:
   This will be done as a follow up and part of KIP-890 part 2
   cc: @CalvinConfluent who was also thinking about this part



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-12 Thread via GitHub


sjhajharia commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1521767609


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1007,7 +1010,8 @@ class ReplicaManager(val config: KafkaConfig,
   transactionalId,
   producerId,
   producerEpoch,
-  generalizedCallback
+  generalizedCallback,
+  ApiKeys.PRODUCE.latestVersion

Review Comment:
   So for this particular path it would be 0 (default)
   And for the new workflow we added, it would be new error (1)
   Where can we use the 3rd use case --> that is add partitions rather than 
verifying?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-12 Thread via GitHub


lucasbru commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521750747


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -508,9 +508,30 @@ private void 
processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign
  */
 private void replaceTargetAssignmentWithNewAssignment(
 ConsumerGroupHeartbeatResponseData.Assignment assignment) {
-currentTargetAssignment.clear();
+

Review Comment:
   1. I did not do it, because the `LocalAssignment` is leaked from this file 
via the `currentAssignment` method, and I didn't necessarily want to put so 
much logic in the public interface `MembershipManager`. However, I think I 
could possibly define the return value of `currentAssignment` by a light 
interface, and then put the fat class with all the updating logic in here. I'll 
give it a try. 
   2. I wouldn't necessarily call it `EMPTY` (to avoid confusion with an empty 
assignment), but rather `NONE` or something, but other than that it sounds like 
a good idea.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 }
 });
 
-// RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+// RebalanceTimeoutMs - only sent when joining or if has changed 
since the last heartbeat
+if (membershipManager.memberEpoch() == 0 || 
sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {

Review Comment:
   true, this can be simplified



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -566,18 +567,18 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 
 // ClientAssignors - not supported yet
 
-// TopicPartitions - only sent if it has changed since the last 
heartbeat. Note that
-// the string consists of just the topic ID and the partitions. 
When an assignment is
-// received, we might not yet know the topic name, and then it is 
learnt subsequently
-// by a metadata update.
-TreeSet assignedPartitions = 
membershipManager.currentAssignment().entrySet().stream()
-.map(entry -> entry.getKey() + "-" + entry.getValue())
-.collect(Collectors.toCollection(TreeSet::new));
-if (!assignedPartitions.equals(sentFields.topicPartitions)) {
+// TopicPartitions - sent with the first heartbeat after a new 
assignment from the server was
+// reconciled. This is ensured by resending the topic partitions 
whenever the local assignment,
+// including its local epoch is changed (although the local epoch 
is not sent in the heartbeat).
+LocalAssignment local = membershipManager.currentAssignment();
+if (local == null) {
+data.setTopicPartitions(Collections.emptyList());
+sentFields.topicPartitions = null;
+} else if (!local.equals(sentFields.topicPartitions)) {

Review Comment:
   That's what is being done. Maybe I should have renamed the field 
`sentFields.topicPartitions` to `sentFields.localAssignment`, but the type is 
now `LocalAssignment` and the `equals` compares the local epoch as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-12 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521745679


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -508,9 +508,30 @@ private void 
processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign
  */
 private void replaceTargetAssignmentWithNewAssignment(
 ConsumerGroupHeartbeatResponseData.Assignment assignment) {
-currentTargetAssignment.clear();
+
+// Return if the same as current assignment; comparison without 
creating a new collection
+if (currentTargetAssignment != null) {
+// check if the new assignment is different from the current 
target assignment
+if (currentTargetAssignment.partitions.size() == 
assignment.topicPartitions().size() &&
+assignment.topicPartitions().stream().allMatch(
+tp -> 
currentTargetAssignment.partitions.containsKey(tp.topicId()) &&
+
currentTargetAssignment.partitions.get(tp.topicId()).size() == 
tp.partitions().size() &&
+
currentTargetAssignment.partitions.get(tp.topicId()).containsAll(tp.partitions(
 {
+return;
+}
+}
+
+// Bump local epoch and replace assignment
+long nextLocalEpoch;
+if (currentTargetAssignment == null) {
+nextLocalEpoch = 0;
+} else {
+nextLocalEpoch = currentTargetAssignment.localEpoch + 1;
+}

Review Comment:
   Is there a reason why we need to compute epochs on the client here? The 
server is the one bumping the epochs whenever it computes a new assignment for 
a member. I was expecting that we just keep a `LocalAssignment` that will 
contain the partitions and epoch the broker sent on the 
`ConsumerGroupHeartbeatResponseData`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-12 Thread via GitHub


ChrisAHolland commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1521727181


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {

Review Comment:
   Looking at the usage here, I can't see a reason why this would be desirable, 
maybe the intention was to have the option to save on same space until 
absolutely necessary? Although, as I mentioned in the PR description, this is 
not worth the resizing costs. Also, this method is guaranteed to add more 
records to the list than `ids.size()` as the method add elements in multiple 
places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-12 Thread via GitHub


ChrisAHolland commented on code in PR #15507:
URL: https://github.com/apache/kafka/pull/15507#discussion_r1521713491


##
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java:
##
@@ -35,24 +35,12 @@ public class BoundedList implements List {
 private final int maxLength;
 private final List underlying;
 
-public static  BoundedList newArrayBacked(int maxLength) {
-return new BoundedList<>(maxLength, new ArrayList<>());
-}
-
-public static  BoundedList newArrayBacked(int maxLength, int 
initialCapacity) {
-return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity));
-}
-
-public BoundedList(int maxLength, List underlying) {

Review Comment:
   I considered this at first, but then I realized that this is only ever used 
in unit tests. Due to that fact, I decided to remove it and have he unit tests 
match how the class is used in the source code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-12 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1521679508


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1007,7 +1010,8 @@ class ReplicaManager(val config: KafkaConfig,
   transactionalId,
   producerId,
   producerEpoch,
-  generalizedCallback
+  generalizedCallback,
+  ApiKeys.PRODUCE.latestVersion

Review Comment:
   What if we send a offset commit request that can't yet handle the error.
   I think we may need a different data structure. Maybe something like enums 
to handle the various versions 
   0 -- default, 1 -- new error, 2 -- add partitions rather than verifying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test [kafka]

2024-03-12 Thread via GitHub


kamalcph commented on PR #15523:
URL: https://github.com/apache/kafka/pull/15523#issuecomment-1991918079

   > Thanks for the change, @kamalcph! Just for my knowledge, why is there a 
possibility that the Shutdownable thread never ran in the CI for this test?
   
   Due to resource constraint, `ShutdownableThread#run` might be called later 
than the `ShutdownableThread#close` method. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15949: Unify metadata.version format in log and error message [kafka]

2024-03-12 Thread via GitHub


FrankYang0529 commented on code in PR #15505:
URL: https://github.com/apache/kafka/pull/15505#discussion_r1521640919


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -139,7 +139,7 @@ object StorageTool extends Logging {
   action(storeTrue())
 formatParser.addArgument("--release-version", "-r").
   action(store()).
-  help(s"A KRaft release version to use for the initial metadata version. 
The minimum is 3.0, the default is 
${MetadataVersion.LATEST_PRODUCTION.version()}")
+  help(s"A KRaft release version to use for the initial metadata.version. 
The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is 
${MetadataVersion.LATEST_PRODUCTION.version()}")

Review Comment:
   Hi @showuon, thanks for the review. Using `MetadataVersion.IBP_3_0_IV0` or 
`MetadataVersion.IBP_3_0_IV0.version()` has same result. Both `toString` and 
`version` functions return `ibpVersion`:
   
   
https://github.com/apache/kafka/blob/58ddd693e69599b177d09c2e384f31e7f5e11171/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L523-L525
   
https://github.com/apache/kafka/blob/58ddd693e69599b177d09c2e384f31e7f5e11171/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L632-L634



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16365: AssignmentsManager callback handling issues [kafka]

2024-03-12 Thread via GitHub


soarez commented on code in PR #15521:
URL: https://github.com/apache/kafka/pull/15521#discussion_r1521634948


##
server/src/main/java/org/apache/kafka/server/AssignmentsManager.java:
##
@@ -202,9 +199,12 @@ public void run() throws Exception {
 return;
 }
 if (existing.timestampNs > timestampNs) {
-existing.onComplete();
-if (log.isDebugEnabled()) log.debug("Dropping assignment 
{} because it's older than {}", this, existing);
+existing.merge(this);
+if (log.isDebugEnabled()) log.debug("Dropping assignment 
{} because it's older than existing {}", this, existing);
 return;
+} else {
+this.merge(existing);

Review Comment:
   Good point. Addressed in 
[3ebe6e1](https://github.com/apache/kafka/pull/15521/commits/3ebe6e12428dfe7b45d9bb1270c4e38e50eb19ab)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-12 Thread via GitHub


lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521632809


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 }
 });
 
-// RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+// RebalanceTimeoutMs - only sent when joining or if has changed 
since the last heartbeat
+if (membershipManager.memberEpoch() == 0 || 
sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {

Review Comment:
   it is static, taken from the `max.poll.interval.ms`, so agree we could set 
it only when epoch == 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16365: AssignmentsManager callback handling issues [kafka]

2024-03-12 Thread via GitHub


gaurav-narula commented on code in PR #15521:
URL: https://github.com/apache/kafka/pull/15521#discussion_r1521612209


##
server/src/main/java/org/apache/kafka/server/AssignmentsManager.java:
##
@@ -202,9 +199,12 @@ public void run() throws Exception {
 return;
 }
 if (existing.timestampNs > timestampNs) {
-existing.onComplete();
-if (log.isDebugEnabled()) log.debug("Dropping assignment 
{} because it's older than {}", this, existing);
+existing.merge(this);
+if (log.isDebugEnabled()) log.debug("Dropping assignment 
{} because it's older than existing {}", this, existing);

Review Comment:
   I think we can avoid `log.isDebugEnabled()` and the like when using 
parameterised messages. Refer https://www.slf4j.org/faq.html#logging_performance



##
server/src/main/java/org/apache/kafka/server/AssignmentsManager.java:
##
@@ -202,9 +199,12 @@ public void run() throws Exception {
 return;
 }
 if (existing.timestampNs > timestampNs) {
-existing.onComplete();
-if (log.isDebugEnabled()) log.debug("Dropping assignment 
{} because it's older than {}", this, existing);
+existing.merge(this);
+if (log.isDebugEnabled()) log.debug("Dropping assignment 
{} because it's older than existing {}", this, existing);
 return;
+} else {
+this.merge(existing);

Review Comment:
   Doesn't this result in the callback handlers for existing being run twice if 
existing was retrieved from inflight?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-12 Thread via GitHub


dajac commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1521552751


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 }
 });
 
-// RebalanceTimeoutMs - only sent if has changed since the last 
heartbeat
-if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+// RebalanceTimeoutMs - only sent when joining or if has changed 
since the last heartbeat
+if (membershipManager.memberEpoch() == 0 || 
sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {

Review Comment:
   If not mistaken, the `rebalanceTimeoutMs` timeout is a static config so we 
could actually set it only in when joining the group (when epoch == 0).



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -508,9 +508,30 @@ private void 
processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assign
  */
 private void replaceTargetAssignmentWithNewAssignment(
 ConsumerGroupHeartbeatResponseData.Assignment assignment) {
-currentTargetAssignment.clear();
+

Review Comment:
   I have to high level thoughts but I am not sure whether they are worth it:
   1) Have you considered moving all the update logic into `LocalAssignment`? 
We could have a method such a `updateWith(Assignment)` which returns an 
`Optional` containing the new assignment if it was updated.
   2) On a similar line, I wonder if we could have an `EMPTY` constant for the 
default `LocalAssignment(-1, null)` instead of relying on `null`. The reasoning 
of this one is that it avoids having to deal with `null` in a few places in 
this file.
   What do you think?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -566,18 +567,18 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 
 // ClientAssignors - not supported yet
 
-// TopicPartitions - only sent if it has changed since the last 
heartbeat. Note that
-// the string consists of just the topic ID and the partitions. 
When an assignment is
-// received, we might not yet know the topic name, and then it is 
learnt subsequently
-// by a metadata update.
-TreeSet assignedPartitions = 
membershipManager.currentAssignment().entrySet().stream()
-.map(entry -> entry.getKey() + "-" + entry.getValue())
-.collect(Collectors.toCollection(TreeSet::new));
-if (!assignedPartitions.equals(sentFields.topicPartitions)) {
+// TopicPartitions - sent with the first heartbeat after a new 
assignment from the server was
+// reconciled. This is ensured by resending the topic partitions 
whenever the local assignment,
+// including its local epoch is changed (although the local epoch 
is not sent in the heartbeat).
+LocalAssignment local = membershipManager.currentAssignment();
+if (local == null) {
+data.setTopicPartitions(Collections.emptyList());
+sentFields.topicPartitions = null;
+} else if (!local.equals(sentFields.topicPartitions)) {

Review Comment:
   Don't we need to also take the assignment epoch into consideration here? In 
other words, should we store the LocalAssignment in `sentFields` and use it to 
do the comparison?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-12 Thread via GitHub


dajac commented on PR #15364:
URL: https://github.com/apache/kafka/pull/15364#issuecomment-1991716363

   @jolshan Thanks for your comments. I addressed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-12 Thread via GitHub


dajac commented on PR #15364:
URL: https://github.com/apache/kafka/pull/15364#issuecomment-1991715637

   > > This is a non-backward compatible change. I think that we should do this 
change to cleanup the record. As KIP-848 is only in early access in 3.7 and 
that we clearly state that we don't plane to support upgrade from it, this is 
acceptable in my opinion.
   > 
   > Are we gating this record change under anything?
   
   There is no gating at all for the records at the moment besides the static 
configuration to enable the protocol. We will introduce the proper gating based 
on the new feature version for 3.8. With those changes, we will clearly break 
the backward compatibility but, as you said, we said that upgrades won't be 
supported from the EA. I actually thought hard about no doing this but I think 
that it is preferable to do it in order to not carry on bad record format from 
the start.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]

2024-03-12 Thread via GitHub


OmniaGM commented on code in PR #15506:
URL: https://github.com/apache/kafka/pull/15506#discussion_r1521506327


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##
@@ -1215,17 +1195,13 @@ public void testSuppressCloseErrors() {
 
 workerTask.initialize(TASK_CONFIG);
 workerTask.initializeAndStart();
-try {
-workerTask.execute();
-fail("workerTask.execute should have thrown an exception");
-} catch (ConnectException e) {
-assertSame("Exception from put should be the cause", putException, 
e.getCause());
-assertTrue("Exception from close should be suppressed", 
e.getSuppressed().length > 0);
-assertSame(closeException, e.getSuppressed()[0]);
-}
+
+Throwable thrownException = assertThrows(ConnectException.class, () -> 
workerTask.execute());
+assertSame("Exception from put should be the cause", putException, 
thrownException.getCause());

Review Comment:
   While `assertSame` and `assertEquals` behave the same when the default 
equals implementation is provided (which compare references)  but I'm curious 
why are we opting in for `assertSame` over `assertEquals`. Do we need the test 
to explicitly comparing object reference or just value? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-12 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1521508636


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -931,17 +920,14 @@ public void testGroupEpochBumpWhenNewStaticMemberJoins() {
 
 ConsumerGroupMember expectedMember3 = new 
ConsumerGroupMember.Builder(memberId3)
 .setMemberEpoch(11)
+.setState(MemberState.UNRELEASED_PARTITIONS)
 .setInstanceId(memberId3)
 .setPreviousMemberEpoch(0)
-.setTargetMemberEpoch(11)
 .setClientId("client")
 .setClientHost("localhost/127.0.0.1")
 .setRebalanceTimeoutMs(5000)
 .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
 .setServerAssignorName("range")
-.setPartitionsPendingAssignment(mkAssignment(

Review Comment:
   It is a matter of the difference between the owned partitions and the 
assignment. The field was not really used in the end so I thought that it is 
better to remove it to save space.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-12 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1521507018


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -338,7 +338,6 @@ public void testConsumerGroupMemberEpochValidation() {
 ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
 .setMemberEpoch(100)
 .setPreviousMemberEpoch(99)
-.setTargetMemberEpoch(100)

Review Comment:
   I think that it makes sense to add it. I reviewed all the cases in the file 
and added where it makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]

2024-03-12 Thread via GitHub


OmniaGM commented on code in PR #15506:
URL: https://github.com/apache/kafka/pull/15506#discussion_r1521506327


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##
@@ -1215,17 +1195,13 @@ public void testSuppressCloseErrors() {
 
 workerTask.initialize(TASK_CONFIG);
 workerTask.initializeAndStart();
-try {
-workerTask.execute();
-fail("workerTask.execute should have thrown an exception");
-} catch (ConnectException e) {
-assertSame("Exception from put should be the cause", putException, 
e.getCause());
-assertTrue("Exception from close should be suppressed", 
e.getSuppressed().length > 0);
-assertSame(closeException, e.getSuppressed()[0]);
-}
+
+Throwable thrownException = assertThrows(ConnectException.class, () -> 
workerTask.execute());
+assertSame("Exception from put should be the cause", putException, 
thrownException.getCause());

Review Comment:
   While `assertSame` and `assertEquals` act the same when the default equals 
implementation is provided (which compare references)  but I'm curious why are 
we opting in for `assertSame` over `assertEquals`. Do we need the test to 
explicitly comparing object reference or just value? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: simplify consumer logic [kafka]

2024-03-12 Thread via GitHub


lianetm commented on code in PR #15519:
URL: https://github.com/apache/kafka/pull/15519#discussion_r1521497910


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -529,10 +529,9 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
+// InstanceId - send when leaving the group as a static member
 membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+if (membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
 data.setInstanceId(groupInstanceId);
 sentFields.instanceId = groupInstanceId;

Review Comment:
   Agree we could simplify and remove the `instanceId` from the `sentFields` 
like @chia7712 suggested. I don't see the need to track it as it's not 
something that will change and we just simply need to send the value we have. 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -529,10 +529,9 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
+// InstanceId - send when leaving the group as a static member
 membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {

Review Comment:
   Agree, the current coordinator expectation seems to me like the right thing 
to do, and we should send the `groupInstanceId` all the time (in the same way 
that we send the `memberId` all the time). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-12 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1521495237


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/MemberState.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The various states that a member can be in. For their definition,
+ * refer to the documentation of {{@link CurrentAssignmentBuilder}}.

Review Comment:
   The doc is in the code now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-12 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1521494464


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -170,72 +127,122 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions(
  * @return A new ConsumerGroupMember or the current one.
  */
 public ConsumerGroupMember build() {
-// A new target assignment has been installed, we need to restart
-// the reconciliation loop from the beginning.
-if (targetAssignmentEpoch != member.targetMemberEpoch()) {
-return transitionToNewTargetAssignmentState();
-}
-
 switch (member.state()) {
-// Check if the partitions have been revoked by the member.
-case REVOKING:
-return maybeTransitionFromRevokingToAssigningOrStable();
+case STABLE:
+// When the member is in the STABLE state, we verify if a newer
+// epoch (or target assignment) is available. If it is, we can
+// reconcile the member towards it. Otherwise, we return.
+if (member.memberEpoch() != targetAssignmentEpoch) {
+return computeNextAssignment(
+member.memberEpoch(),
+member.assignedPartitions()
+);
+} else {
+return member;
+}
 
-// Check if pending partitions have been freed up.
-case ASSIGNING:
-return maybeTransitionFromAssigningToAssigningOrStable();
+case UNREVOKED_PARTITIONS:
+// When the member is in the UNREVOKED_PARTITIONS state, we 
wait
+// until the member has revoked the necessary partitions. They 
are
+// considered revoked when they are not anymore reported in the
+// owned partitions set in the ConsumerGroupHeartbeat API.
 
-// Nothing to do.
-case STABLE:
-return member;
+// If the member does not provide its owned partitions. We 
cannot
+// progress.
+if (ownedTopicPartitions == null) {
+return member;
+}
+
+// If the member provides its owned partitions. We verify if 
it still
+// owns any of the revoked partitions. If it does, we cannot 
progress.
+for (ConsumerGroupHeartbeatRequestData.TopicPartitions 
topicPartitions : ownedTopicPartitions) {
+for (Integer partitionId : topicPartitions.partitions()) {
+boolean stillHasRevokedPartition = member
+.partitionsPendingRevocation()
+.getOrDefault(topicPartitions.topicId(), 
Collections.emptySet())
+.contains(partitionId);
+if (stillHasRevokedPartition) {
+return member;
+}
+}
+}
+
+// When the member has revoked all the pending partitions, it 
can
+// transition to the next epoch (current + 1) and we can 
reconcile
+// its state towards the latest target assignment.
+return computeNextAssignment(
+member.memberEpoch() + 1,
+member.assignedPartitions()
+);
+
+case UNRELEASED_PARTITIONS:
+// When the member is in the UNRELEASED_PARTITIONS, we 
reconcile the
+// member towards the latest target assignment. This will 
assign any
+// of the unreleased partitions when they become available.
+return computeNextAssignment(
+member.memberEpoch(),
+member.assignedPartitions()
+);
+
+case UNKNOWN:
+// We could only end up in this state if a new state is added 
in the
+// future and the group coordinator is downgraded. In this 
case, the
+// best option is to fence the member to force it to rejoin 
the group
+// without any partitions and to reconcile it again from 
scratch.
+if (ownedTopicPartitions == null || 
!ownedTopicPartitions.isEmpty()) {
+throw new FencedMemberEpochException("The consumer group 
member is in a unknown state. "
++ "The member must abandon all its partitions and 
rejoin.");
+}
+
+return computeNextAssignment(

Review Comment:
   We would only hit this case if we ever introduce a new state in the future. 
I was trying to ensure that we would handle it somehow. My proposal here is to 
return the fenced member epoch error in order to force the 

Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-12 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1521491629


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -33,49 +33,6 @@
  * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
- *
- * The member state has the following properties:
- * - Current Epoch:

Review Comment:
   Indeed. I moved the description closer to the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-12 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1521490928


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -528,27 +478,6 @@ public Map> 
partitionsPendingRevocation() {
 return partitionsPendingRevocation;
 }
 
-/**
- * @return The set of partitions awaiting assignment to the member.
- */
-public Map> partitionsPendingAssignment() {
-return partitionsPendingAssignment;
-}
-
-/**
- * @return A string representation of the current assignment state.
- */
-public String currentAssignmentSummary() {

Review Comment:
   Correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-12 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1521488580


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1796,12 +1834,12 @@ public void onLoaded() {
 consumerGroup.members().forEach((memberId, member) -> {
 log.debug("Loaded member {} in consumer group {}.", 
memberId, groupId);
 scheduleConsumerGroupSessionTimeout(groupId, memberId);
-if (member.state() == 
ConsumerGroupMember.MemberState.REVOKING) {
-scheduleConsumerGroupRevocationTimeout(
+if (member.state() == 
MemberState.UNREVOKED_PARTITIONS) {
+scheduleConsumerGroupRebalanceTimeout(
 groupId,
-memberId,
-member.rebalanceTimeoutMs(),
-member.memberEpoch()
+member.memberId(),

Review Comment:
   I am actually not sure why I changed it but they have both the same value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-12 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1521487635


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1400,35 +1440,35 @@ private void cancelConsumerGroupSessionTimeout(
 }
 
 /**
- * Schedules a revocation timeout for the member.
+ * Schedules a rebalance timeout for the member.
  *
  * @param groupId   The group id.
  * @param memberId  The member id.
- * @param revocationTimeoutMs   The revocation timeout.
- * @param expectedMemberEpoch   The expected member epoch.
+ * @param memberEpoch   The member epoch.
+ * @param rebalanceTimeoutMsThe rebalance timeout.
  */
-private void scheduleConsumerGroupRevocationTimeout(
+private void scheduleConsumerGroupRebalanceTimeout(
 String groupId,
 String memberId,
-long revocationTimeoutMs,
-int expectedMemberEpoch
+int memberEpoch,
+int rebalanceTimeoutMs
 ) {
-String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
-timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+String key = consumerGroupRebalanceTimeoutKey(groupId, memberId);
+timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
 try {
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
 ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
 
-if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
||
-member.memberEpoch() != expectedMemberEpoch) {
-log.debug("[GroupId {}] Ignoring revocation timeout for {} 
because the member " +
-"state does not match the expected state.", groupId, 
memberId);
+if (member.memberEpoch() == memberEpoch) {
+log.info("[GroupId {}] Member {} fenced from the group 
because " +
+"it failed to transition from epoch {} within 
{}ms.",
+groupId, memberId, memberEpoch, rebalanceTimeoutMs);
+return new 
CoordinatorResult<>(consumerGroupFenceMember(group, member));

Review Comment:
   > I may have asked this on a previous pr, but are we assuming the member 
epoch of the member (not the one passed in) is always never less than the 
member epoch passed into this method. That makes sense given the epoch is 
monotonically increasing, but just wanted to confirm.
   
   It is only newer when the member confirms that it has revoked the 
partitions. Here we are basically saying that we want a member to transition 
from its current epoch (and acks the revocation) within the rebalance timeout.
   
   > As an aside, when we fence a group member, do we basically kick it out of 
the group and force it to rejoin?
   Can the client rejoin without restarting?
   
   On the server, we only fence the member. On the client, it will rejoin when 
it is fenced but this is a client side implementation choice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-12 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1521482723


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;
 // 2. The member just joined or rejoined to group (epoch equals to 
zero);
 // 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
 response.setAssignment(createResponseAssignment(updatedMember));
 }
 
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Reconciles the current assignment of the member if needed.
+ *
+ * @param groupId   The group id.
+ * @param memberThe member to reconcile.
+ * @param currentPartitionEpoch The function returning the current epoch of
+ *  a given partition.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment  The target assignment.
+ * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+ *  is reported in the ConsumerGroupHeartbeat 
API and
+ *  it could be null if not provided.
+ * @param records   The list to accumulate any new records.
+ * @return The received member if no changes have been made; or a new
+ * member containing the new assignment.
+ */
+private ConsumerGroupMember maybeReconcile(
+String groupId,
+ConsumerGroupMember member,
+BiFunction currentPartitionEpoch,
+int targetAssignmentEpoch,
+Assignment targetAssignment,
+List 
ownedTopicPartitions,
+List records
+) {
+if (member.isReconciledTo(targetAssignmentEpoch)) {
+return member;
+}
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+.withCurrentPartitionEpoch(currentPartitionEpoch)
+.withOwnedTopicPartitions(ownedTopicPartitions)
+.build();
+
+if (!updatedMember.equals(member)) {
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+ + "assignedPartitions={} and revokedPartitions={}.",
+groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+scheduleConsumerGroupRebalanceTimeout(
+groupId,
+updatedMember.memberId(),
+updatedMember.memberEpoch(),
+updatedMember.rebalanceTimeoutMs()
+);
+} else {

Review Comment:
   So I use rebalance timeout in order to be consistent with the 
rebalanceTimeoutMs field. In the end, the name does not really matter here. The 
important part is how we apply it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-12 Thread via GitHub


dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1521477968


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;

Review Comment:
   I will address this separately: 
https://issues.apache.org/jira/browse/KAFKA-16367.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16367) Full ConsumerGroupHeartbeat response must be sent when full request is received

2024-03-12 Thread David Jacot (Jira)
David Jacot created KAFKA-16367:
---

 Summary: Full ConsumerGroupHeartbeat response must be sent when 
full request is received
 Key: KAFKA-16367
 URL: https://issues.apache.org/jira/browse/KAFKA-16367
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]

2024-03-12 Thread via GitHub


OmniaGM commented on code in PR #15506:
URL: https://github.com/apache/kafka/pull/15506#discussion_r1521477925


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##
@@ -580,12 +570,9 @@ public void testErrorInRebalancePartitionRevocation() {
 
 workerTask.iteration();
 verifyPollInitialAssignment();
-try {
-workerTask.iteration();
-fail("Poll should have raised the rebalance exception");
-} catch (RuntimeException e) {
-assertEquals(exception, e);
-}
+
+Throwable thrownException = assertThrows(RuntimeException.class, () -> 
workerTask.iteration());

Review Comment:
   same, so am not going to point it out for each test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]

2024-03-12 Thread via GitHub


OmniaGM commented on code in PR #15506:
URL: https://github.com/apache/kafka/pull/15506#discussion_r1521475624


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##
@@ -553,12 +547,8 @@ public void testErrorInRebalancePartitionLoss() {
 workerTask.iteration();
 verifyPollInitialAssignment();
 
-try {
-workerTask.iteration();
-fail("Poll should have raised the rebalance exception");
-} catch (RuntimeException e) {
-assertEquals(exception, e);
-}
+Throwable thrownException = assertThrows(RuntimeException.class, () -> 
workerTask.iteration());

Review Comment:
   The type of the `thrownException` should be `RuntimeException` as it has 
been casted when we called `assertThrows(RuntimeException.class, )`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16293) Test log directory failure in Kraft

2024-03-12 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim updated KAFKA-16293:
--
Fix Version/s: 3.7.1

> Test log directory failure in Kraft
> ---
>
> Key: KAFKA-16293
> URL: https://issues.apache.org/jira/browse/KAFKA-16293
> Project: Kafka
>  Issue Type: Test
>  Components: jbod, kraft
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Assignee: Gaurav Narula
>Priority: Major
> Fix For: 3.7.1
>
>
> We should update the log directory failure system test to run in Kraft mode 
> following the work for KIP 858.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16234) Log directory failure re-creates partitions in another logdir automatically

2024-03-12 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim updated KAFKA-16234:
--
Fix Version/s: 3.7.1

> Log directory failure re-creates partitions in another logdir automatically
> ---
>
> Key: KAFKA-16234
> URL: https://issues.apache.org/jira/browse/KAFKA-16234
> Project: Kafka
>  Issue Type: Sub-task
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Assignee: Omnia Ibrahim
>Priority: Major
> Fix For: 3.7.1
>
>
> With [KAFKA-16157|https://github.com/apache/kafka/pull/15263] we made changes 
> in {{HostedPartition.Offline}} enum variant to embed {{Partition}} object. 
> Further, {{ReplicaManager::getOrCreatePartition}} tries to compare the old 
> and new topicIds to decide if it needs to create a new log.
> The getter for {{Partition::topicId}} relies on retrieving the topicId from 
> {{log}} field or {{{}logManager.currentLogs{}}}. The former is set to 
> {{None}} when a partition is marked offline and the key for the partition is 
> removed from the latter by {{{}LogManager::handleLogDirFailure{}}}. 
> Therefore, topicId for a partitioned marked offline always returns {{None}} 
> and new logs for all partitions in a failed log directory are always created 
> on another disk.
> The broker will fail to restart after the failed disk is repaired because 
> same partitions will occur in two different directories. The error does 
> however inform the operator to remove the partitions from the disk that 
> failed which should help with broker startup.
> We can avoid this with KAFKA-16212 but in the short-term, an immediate 
> solution can be to have {{Partition}} object accept {{Option[TopicId]}} in 
> it's constructor and have it fallback to {{log}} or {{logManager}} if it's 
> unset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16365) AssignmentsManager mismanages completion notifications

2024-03-12 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim updated KAFKA-16365:
--
Affects Version/s: 3.7.0

> AssignmentsManager mismanages completion notifications
> --
>
> Key: KAFKA-16365
> URL: https://issues.apache.org/jira/browse/KAFKA-16365
> Project: Kafka
>  Issue Type: Sub-task
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
> Fix For: 3.7.1
>
>
> When moving replicas between directories in the same broker, future replica 
> promotion hinges on acknowledgment from the controller of a change in the 
> directory assignment.
>  
> ReplicaAlterLogDirsThread relies on AssignmentsManager for a completion 
> notification of the directory assignment change.
>  
> In its current form, under certain assignment scheduling, AssignmentsManager 
> both miss completion notifications, or prematurely trigger them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16365) AssignmentsManager mismanages completion notifications

2024-03-12 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim updated KAFKA-16365:
--
Fix Version/s: 3.7.1

> AssignmentsManager mismanages completion notifications
> --
>
> Key: KAFKA-16365
> URL: https://issues.apache.org/jira/browse/KAFKA-16365
> Project: Kafka
>  Issue Type: Sub-task
>  Components: jbod
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
> Fix For: 3.7.1
>
>
> When moving replicas between directories in the same broker, future replica 
> promotion hinges on acknowledgment from the controller of a change in the 
> directory assignment.
>  
> ReplicaAlterLogDirsThread relies on AssignmentsManager for a completion 
> notification of the directory assignment change.
>  
> In its current form, under certain assignment scheduling, AssignmentsManager 
> both miss completion notifications, or prematurely trigger them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16365) AssignmentsManager mismanages completion notifications

2024-03-12 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim updated KAFKA-16365:
--
Component/s: jbod

> AssignmentsManager mismanages completion notifications
> --
>
> Key: KAFKA-16365
> URL: https://issues.apache.org/jira/browse/KAFKA-16365
> Project: Kafka
>  Issue Type: Sub-task
>  Components: jbod
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
>
> When moving replicas between directories in the same broker, future replica 
> promotion hinges on acknowledgment from the controller of a change in the 
> directory assignment.
>  
> ReplicaAlterLogDirsThread relies on AssignmentsManager for a completion 
> notification of the directory assignment change.
>  
> In its current form, under certain assignment scheduling, AssignmentsManager 
> both miss completion notifications, or prematurely trigger them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]

2024-03-12 Thread via GitHub


OmniaGM commented on code in PR #15335:
URL: https://github.com/apache/kafka/pull/15335#discussion_r1521445120


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -355,10 +355,11 @@ class LogManager(logDirs: Seq[File],
 } else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) {
   addStrayLog(topicPartition, log)
   warn(s"Loaded stray log: $logDir")
-} else if (shouldBeStrayKraftLog(log)) {
-  // Mark the partition directories we're not supposed to have as stray. 
We have to do this
-  // during log load because topics may have been recreated with the same 
name while a disk
-  // was offline.
+} else if (isStray(log.topicId, topicPartition)) {
+  // Opposite of Zookeeper mode deleted topic in KRAFT mode can be 
recreated while it's not fully deleted from broker.
+  // As a result of this broker in KRAFT mode with one offline directory 
has no way to detect to-be-deleted replica in an offline directory earlier.
+  // However, broker need to mark the partition directories as stray 
during log load because topics may have been
+  // recreated with the same name while a log directory was offline.

Review Comment:
   updated with the suggestion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-12 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1521398482


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -243,33 +243,20 @@ public MemoryRecords build() {
 /**
  * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
  *
- * If the log append time is used, the offset will be the last offset 
unless no compression is used and

Review Comment:
   Updated. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve logging in AssignmentsManager [kafka]

2024-03-12 Thread via GitHub


soarez commented on PR #15522:
URL: https://github.com/apache/kafka/pull/15522#issuecomment-1991528082

   @showuon PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16365: AssignmentsManager callback handling issues [kafka]

2024-03-12 Thread via GitHub


soarez commented on PR #15521:
URL: https://github.com/apache/kafka/pull/15521#issuecomment-1991526197

   @showuon @OmniaGM @gaurav-narula could you have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test [kafka]

2024-03-12 Thread via GitHub


nikramakrishnan commented on PR #15523:
URL: https://github.com/apache/kafka/pull/15523#issuecomment-1991480460

   Thanks for the change, @kamalcph! Just for my knowledge, why is there a 
possibility that the Shutdownable thread never ran in the CI for this test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-12 Thread via GitHub


johnnychhsu commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1521314724


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -293,11 +293,11 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
 maxTimestamp = now;
-offsetOfMaxTimestamp = initialOffset;
-}
-
-if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
-offsetOfMaxTimestamp = offsetCounter.value - 1;
+if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
+offsetOfMaxTimestamp = offsetCounter.value - 1;

Review Comment:
   thanks for the comment!
   this is the same behaviour before the previous change affected the results. 
It differentiated the behaviour based on the magic value. I think the reason is 
because
   
   
   > /**
   > A record batch is a container for records. In old versions of the record 
format (versions 0 and 1),
   > a batch consisted always of a single record if no compression was enabled, 
but could contain
   > many records otherwise. Newer versions (magic versions 2 and above) will 
generally contain many records
   > regardless of compression.
   >*/
   > public interface RecordBatch extends Iterable
   
   So version before magic_value=2 doesn't matter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-12 Thread via GitHub


johnnychhsu commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1521308393


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -293,11 +293,11 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 
 if (timestampType == TimestampType.LOG_APPEND_TIME) {
 maxTimestamp = now;
-offsetOfMaxTimestamp = initialOffset;
-}
-
-if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
-offsetOfMaxTimestamp = offsetCounter.value - 1;
+if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
+offsetOfMaxTimestamp = offsetCounter.value - 1;
+} else {
+offsetOfMaxTimestamp = initialOffset;

Review Comment:
   thanks for the comment! 
   yes the naming is confusing, although it does represent the latest offset 
but it looks like it's only the offset of one batch, instead of the record. Let 
me address this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test [kafka]

2024-03-12 Thread via GitHub


kamalcph commented on PR #15523:
URL: https://github.com/apache/kafka/pull/15523#issuecomment-1991432854

   @divijvaidya @showuon 
   
   Would you please review this minor change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test [kafka]

2024-03-12 Thread via GitHub


kamalcph opened a new pull request, #15523:
URL: https://github.com/apache/kafka/pull/15523

   It is possible that the Shutdownable thread never ran in the CI so the test 
is failing
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15206) Flaky test RemoteIndexCacheTest.testClose()

2024-03-12 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-15206:


Assignee: Kamal Chandraprakash  (was: Lan Ding)

> Flaky test RemoteIndexCacheTest.testClose()
> ---
>
> Key: KAFKA-15206
> URL: https://issues.apache.org/jira/browse/KAFKA-15206
> Project: Kafka
>  Issue Type: Test
>Reporter: Divij Vaidya
>Assignee: Kamal Chandraprakash
>Priority: Minor
>  Labels: flaky-test
> Fix For: 3.8.0
>
>
> Test fails 2% of the time.
> [https://ge.apache.org/scans/tests?search.timeZoneId=Europe/Berlin=kafka.log.remote.RemoteIndexCacheTest=testClose()]
>  
> This test should be modified to test 
> assertTrue(cache.cleanerThread.isShutdownComplete) in a 
> TestUtils.waitUntilTrue condition which will catch the InterruptedException 
> and exit successfully on it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >