[GitHub] [kafka] cmccabe merged pull request #14169: KAFKA-15318: Update the Authorizer via AclPublisher

2023-08-09 Thread via GitHub


cmccabe merged PR #14169:
URL: https://github.com/apache/kafka/pull/14169


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky opened a new pull request, #14181: KAFKA-15022: [10/N] docs for rack aware assignor

2023-08-09 Thread via GitHub


lihaosky opened a new pull request, #14181:
URL: https://github.com/apache/kafka/pull/14181

   Docs for rack aware assignor
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15288) Change BrokerApiVersionsCommandTest to support kraft mode

2023-08-09 Thread Deng Ziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deng Ziming resolved KAFKA-15288.
-
Resolution: Fixed

> Change BrokerApiVersionsCommandTest to support kraft mode
> -
>
> Key: KAFKA-15288
> URL: https://issues.apache.org/jira/browse/KAFKA-15288
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Deng Ziming
>Priority: Minor
>
> Currently we only test zk mode for BrokerApiVersionsCommand



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-09 Thread via GitHub


kamalcph commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1289553687


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -459,49 +463,53 @@ public static void validateValues(Map props) {
 long maxCompactionLag = (Long) 
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
 if (minCompactionLag > maxCompactionLag) {
 throw new InvalidConfigurationException("conflict topic config 
setting "
-+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
-+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
++ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
++ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
 }
+}
 
-if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
-boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
-String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
-if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
-throw new ConfigException("Remote log storage is unsupported 
for the compacted topics");
-}
+public static void validateValuesInBroker(Map props) {
+validateValues(props);
+Boolean isRemoteLogStorageSystemEnabled =
+(Boolean) 
props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
+Boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);

Review Comment:
   It takes the default value of `false`.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -459,49 +463,53 @@ public static void validateValues(Map props) {
 long maxCompactionLag = (Long) 
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
 if (minCompactionLag > maxCompactionLag) {
 throw new InvalidConfigurationException("conflict topic config 
setting "
-+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
-+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
++ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
++ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
 }
+}
 
-if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
-boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
-String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
-if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
-throw new ConfigException("Remote log storage is unsupported 
for the compacted topics");
-}
+public static void validateValuesInBroker(Map props) {
+validateValues(props);
+Boolean isRemoteLogStorageSystemEnabled =
+(Boolean) 
props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
+Boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
+if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) {
+throw new ConfigException("Tiered Storage functionality is 
disabled in the broker. " +
+"Topic cannot be configured with remote log storage.");
 }
 
-if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) {
-Long retentionBytes = (Long) 
props.get(TopicConfig.RETENTION_BYTES_CONFIG);
-Long localLogRetentionBytes = (Long) 
props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
-if (retentionBytes > -1 && localLogRetentionBytes != -2) {
-if (localLogRetentionBytes == -1) {
-String message = String.format("Value must not be -1 as %s 
value is set as %d.",
-TopicConfig.RETENTION_BYTES_CONFIG, 
retentionBytes);
-throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
-}
-if (localLogRetentionBytes > retentionBytes) {
-String message = String.format("Value must not be more 
than %s property value: %d",
-TopicConfig.RETENTION_BYTES_CONFIG, 
retentionBytes);
-throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
-   

[GitHub] [kafka] dengziming merged pull request #14175: KAFKA-15288: Change BrokerApiVersionsCommandTest to support kraft mode

2023-08-09 Thread via GitHub


dengziming merged PR #14175:
URL: https://github.com/apache/kafka/pull/14175


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-09 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1289577043


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-09 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1289577043


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-09 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1289577612


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1464,12 +1513,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 }
 
-deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
+deleteOldSegments(shouldDelete, RetentionSizeBreach(this, 
remoteLogEnabled()))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
+  nextSegmentOpt.exists(_.baseOffset <= localLogStartOffset())

Review Comment:
   Nice catch! Missed it while merging the conflicts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-09 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1289577043


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -696,11 +704,327 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!retentionTimeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+// It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+// are ascending with in an epoch.
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted && retentionSizeData.isPresent()) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+return isSegmentDeleted;
+}
+
+// It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+// unreferenced because they are not part of the current leader 
epoch lineage.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageExcept

[GitHub] [kafka] mehbey commented on a diff in pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…

2023-08-09 Thread via GitHub


mehbey commented on code in PR #14135:
URL: https://github.com/apache/kafka/pull/14135#discussion_r1289574391


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -186,6 +191,12 @@ public Optional serverConfigName(String 
configName) {
 @SuppressWarnings("deprecation")
 private static final String MESSAGE_FORMAT_VERSION_CONFIG = 
TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
 
+@SuppressWarnings("deprecation")

Review Comment:
   This is for `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` It will 
be removed in the 4.0. I have added a comment to make it clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mehbey commented on a diff in pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…

2023-08-09 Thread via GitHub


mehbey commented on code in PR #14135:
URL: https://github.com/apache/kafka/pull/14135#discussion_r1289574103


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -1398,6 +1437,73 @@ class LogValidatorTest {
 assertEquals(6, e.recordErrors.size)
   }
 
+  @Test
+  def testRecordWithPastTimestampIsRejected(): Unit = {
+val timestampBeforeMaxConfig = 24 * 60 * 60 * 1000L //24 hrs
+val timestampAfterMaxConfig = 1 * 60 * 60 * 1000L //1 hr
+val now = System.currentTimeMillis()
+val fiveMinutesBeforeThreshold = now - timestampBeforeMaxConfig - (5 * 60 
* 1000L)
+val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, 
timestamp = fiveMinutesBeforeThreshold,
+  codec = CompressionType.GZIP)
+val e = assertThrows(classOf[RecordValidationException],
+  () => new LogValidator(
+records,
+topicPartition,
+time,
+CompressionType.GZIP,
+CompressionType.GZIP,
+false,
+RecordBatch.MAGIC_VALUE_V1,
+TimestampType.CREATE_TIME,
+timestampBeforeMaxConfig,
+timestampAfterMaxConfig,
+RecordBatch.NO_PARTITION_LEADER_EPOCH,
+AppendOrigin.CLIENT,
+MetadataVersion.latest
+  ).validateMessagesAndAssignOffsets(
+PrimitiveRef.ofLong(0L), metricsRecorder, 
RequestLocal.withThreadConfinedCaching.bufferSupplier
+  )
+)
+
+assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
+assertFalse(e.recordErrors.isEmpty)
+assertEquals(e.recordErrors.size, 3)
+  }
+
+
+  @Test
+  def testRecordWithFutureTimestampIsRejected(): Unit = {
+val timestampBeforeMaxConfig = 24 * 60 * 60 * 1000L //24 hrs
+val timestampAfterMaxConfig = 1 * 60 * 60 * 1000L //1 hr
+val now = System.currentTimeMillis()

Review Comment:
   that will make the test a little bit involved. The reason is we are using a 
producer from the common test until `TestUtils.createProducer` which spins up a 
test producer using the System's current timestamp. We can potentially refactor 
that code so that it takes a mock time. But without that the timestamp 
validation will use the system's current timestamp hence the tests follow the 
same pattern as well.



##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -761,6 +761,7 @@ class KafkaConfigTest {
   }
 
   @Test
+  @nowarn("cat=deprecation")

Review Comment:
   Updated everywhere. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mehbey commented on a diff in pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…

2023-08-09 Thread via GitHub


mehbey commented on code in PR #14135:
URL: https://github.com/apache/kafka/pull/14135#discussion_r1289573699


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -1398,6 +1437,73 @@ class LogValidatorTest {
 assertEquals(6, e.recordErrors.size)
   }
 
+  @Test
+  def testRecordWithPastTimestampIsRejected(): Unit = {
+val timestampBeforeMaxConfig = 24 * 60 * 60 * 1000L //24 hrs
+val timestampAfterMaxConfig = 1 * 60 * 60 * 1000L //1 hr
+val now = System.currentTimeMillis()
+val fiveMinutesBeforeThreshold = now - timestampBeforeMaxConfig - (5 * 60 
* 1000L)
+val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, 
timestamp = fiveMinutesBeforeThreshold,
+  codec = CompressionType.GZIP)
+val e = assertThrows(classOf[RecordValidationException],
+  () => new LogValidator(
+records,
+topicPartition,
+time,
+CompressionType.GZIP,
+CompressionType.GZIP,
+false,
+RecordBatch.MAGIC_VALUE_V1,

Review Comment:
   no reason, just a bad copy paste from other tests :). Updated, thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mehbey commented on a diff in pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…

2023-08-09 Thread via GitHub


mehbey commented on code in PR #14135:
URL: https://github.com/apache/kafka/pull/14135#discussion_r1289573138


##
core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala:
##
@@ -120,17 +121,17 @@ class PlaintextProducerSendTest extends 
BaseProducerSendTest {
 }
   }
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testSendWithInvalidCreateTime(quorum: String): Unit = {
+  @ParameterizedTest
+  @MethodSource(Array("timestampConfigProvider"))
+  def testSendWithInvalidBeforeAndAfterTimestamp(messageTimeStampConfig: 
String, recordTimestamp: Long): Unit = {

Review Comment:
   added more tests cases for valid scenarios of 1) when topic config threshold 
is the same us the record timestamp 2) when record timestamp is within range of 
threshold



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-09 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1289566534


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+  // Check not to delete segments which are not yet copied to tiered 
storage
+  val isSegmentTieredToRemoteStorage =

Review Comment:
   Local log size is based on the local retention configs and those are always 
less than or equal to the complete log retention.
   
   I'm unclear about the rationale behind retaining data in local storage using 
an overall retention size where there are no remote log segments. Please 
provide clarification.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mehbey commented on a diff in pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…

2023-08-09 Thread via GitHub


mehbey commented on code in PR #14135:
URL: https://github.com/apache/kafka/pull/14135#discussion_r1289521270


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1862,7 +1884,35 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
 else MetadataVersion.fromVersionString(logMessageFormatVersionString)
 
   def logMessageTimestampType = 
TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
+
+  /* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for details 
*/
+  @deprecated("3.6")
   def logMessageTimestampDifferenceMaxMs: Long = 
getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
+
+  // In the transition period before logMessageTimestampDifferenceMaxMs is 
removed, to maintain backward compatibility,
+  // we are using its value if logMessageTimestampBeforeMaxMs default value 
hasn't changed.
+  @nowarn("cat=deprecation")
+  def logMessageTimestampBeforeMaxMs: Long = {
+val messageTimestampBeforeMaxMs: Long = 
getLong(KafkaConfig.LogMessageTimestampBeforeMaxMsProp)
+if (messageTimestampBeforeMaxMs != Long.MaxValue) {

Review Comment:
   yeah that make sense - updated.



##
core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala:
##
@@ -120,17 +121,17 @@ class PlaintextProducerSendTest extends 
BaseProducerSendTest {
 }
   }
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)

Review Comment:
   `quorum: String` this parameter was not used in the test. There are similar 
unit tests in this file which defines parameters like this @ValueSource(strings 
= Array("zk", "kraft")) but don't actually use them in the test. I have 
refrained from cleaning all the tests not to pollute this PR but that can be a 
quick follow up PR separately.
   
   If there is some reason which I missed but we still want to the parametrized 
value, we can add it to the argument provider method like 
   
   ```
Arguments.of("zk", TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, 
Long.box(System.currentTimeMillis() - fiveMinutesInMs)),
 Arguments.of("zk", TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, 
Long.box(System.currentTimeMillis() - fiveMinutesInMs)),
 Arguments.of("zk", TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, 
Long.box(System.currentTimeMillis() + fiveMinutesInMs)),
Arguments.of("kraft", 
TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, 
Long.box(System.currentTimeMillis() - fiveMinutesInMs)),
 Arguments.of("kraft", 
TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, 
Long.box(System.currentTimeMillis() - fiveMinutesInMs)),
 Arguments.of("kraft", 
TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, 
Long.box(System.currentTimeMillis() + fiveMinutesInMs)),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15240) BrokerToControllerChannelManager cache activeController error cause DefaultAlterPartitionManager send AlterPartition request failed

2023-08-09 Thread shilin Lu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752604#comment-17752604
 ] 

shilin Lu commented on KAFKA-15240:
---

[~cmccabe] [~hachikuji] please take a look at this issue,thks

> BrokerToControllerChannelManager cache activeController error cause 
> DefaultAlterPartitionManager send AlterPartition request failed
> ---
>
> Key: KAFKA-15240
> URL: https://issues.apache.org/jira/browse/KAFKA-15240
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0, 2.8.1, 2.8.2, 3.5.0
> Environment: 2.8.1 kafka version
>Reporter: shilin Lu
>Assignee: shilin Lu
>Priority: Major
> Attachments: image-2023-07-24-16-35-56-589.png
>
>
> After KIP-497,partition leader do not use zk to propagateIsrChanges,it will 
> send AlterPartitionRequest to controller to propagateIsrChanges.Then broker 
> will cache active controller node info through controllerNodeProvider 
> interface.
> 2023.07.12,in kafka product environment,we find so much `Broker had a stale 
> broker epoch` when send partitionAlterRequest to controller.And in this kafka 
> cluster has so much replica not in isr assignment with replica fetch is 
> correct.So it only propagateIsrChanges failed.
> !https://iwiki.woa.com/tencent/api/attachments/s3/url?attachmentid=3165506!
> But there has something strange,if broker send partitionAlterRequest failed 
> controller will print some log like this.But in active controller node not 
> find this log info
> !image-2023-07-24-16-35-56-589.png!
> Then i just suspect this broker connect to an error active controller.Through 
> network packet capture, find this broker connect to an unfamiliar broker 
> port(9092) send request.Refer to this kafka cluster operation history,find 
> this unfamiliar broker is an old broker node in this cluster and this node is 
> a controller node in new kafka cluster.
> Current BrokerToControllerChannelManager update active controller only 
> happened when disconnect or responseCode is NOT_CONTROLLER. So when no 
> request send and error broker node is another kafka cluster controller 
> node,this case will repetite.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-09 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1289488513


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+  // Check not to delete segments which are not yet copied to tiered 
storage
+  val isSegmentTieredToRemoteStorage =
+if (remoteLogEnabled()) upperBoundOffset > 0 && upperBoundOffset - 1 
<= highestOffsetInRemoteStorage
+else true

Review Comment:
   No, this should be true if the remote storage is not enabled as this segment 
should be eligible based on other checks like `highWatermark >= 
upperBoundOffset && predicate(segment, nextSegmentOpt)`. Existing tests in 
`UnifiedLogTest`, `LogOffsetTest`, `LogLoaderTest`, `LogCleanerTest` already 
cover those scenarios.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-9199) Improve handling of out of sequence errors lower than last acked sequence

2023-08-09 Thread Fei Xie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752596#comment-17752596
 ] 

Fei Xie commented on KAFKA-9199:


Hi [~hachikuji] , is this issue still open? You've mentioned that this issue 
poped up in the reassignment system test. I was wondering if there are any 
instructions to rerun the failed system test? Thank you for answering my 
questions.

> Improve handling of out of sequence errors lower than last acked sequence
> -
>
> Key: KAFKA-9199
> URL: https://issues.apache.org/jira/browse/KAFKA-9199
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Priority: Major
>
> The broker attempts to cache the state of the last 5 batches in order to 
> enable duplicate detection. This caching is not guaranteed across restarts: 
> we only write the state of the last batch to the snapshot file. It is 
> possible in some cases for this to result in a sequence such as the following:
>  # Send sequence=n
>  # Sequence=n successfully written, but response is not received
>  # Leader changes after broker restart
>  # Send sequence=n+1
>  # Receive successful response for n+1
>  # Sequence=n times out and is retried, results in out of order sequence
> There are a couple problems here. First, it would probably be better for the 
> broker to return DUPLICATE_SEQUENCE_NUMBER when a sequence number is received 
> which is lower than any of the cached batches. Second, the producer handles 
> this situation by just retrying until expiration of the delivery timeout. 
> Instead it should just fail the batch. 
> This issue popped up in the reassignment system test. It ultimately caused 
> the test to fail because the producer was stuck retrying the duplicate batch 
> repeatedly until ultimately giving up.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax merged pull request #14178: KAFKA-15022: [9/N] use RackAwareTaskAssignor in StickyTaskAssignor

2023-08-09 Thread via GitHub


mjsax merged PR #14178:
URL: https://github.com/apache/kafka/pull/14178


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'

2023-08-09 Thread via GitHub


showuon commented on PR #14078:
URL: https://github.com/apache/kafka/pull/14078#issuecomment-1672467423

   The failed tests are listed here: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14078/2/
   They are all unrelated to your change. I'll review this PR later. Thanks for 
the contribution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming merged pull request #14110: MINOR: Add test for describe topic with ID

2023-08-09 Thread via GitHub


dengziming merged PR #14110:
URL: https://github.com/apache/kafka/pull/14110


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] olalamichelle commented on pull request #14078: KAFKA-14780: Fix flaky test 'testSecondaryRefreshAfterElapsedDelay'

2023-08-09 Thread via GitHub


olalamichelle commented on PR #14078:
URL: https://github.com/apache/kafka/pull/14078#issuecomment-1672436566

   Hi Divij,
   
   It is stating that "all checks have failed and this commit has test 
failures". But upon checking the merge details, I was not able to find the 
exact failure info. Could you please share some advice? Thank you so much!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ex172000 commented on pull request #14110: MINOR: Add test for describe topic with ID

2023-08-09 Thread via GitHub


ex172000 commented on PR #14110:
URL: https://github.com/apache/kafka/pull/14110#issuecomment-1672429558

   @satishd can you help to review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14149: HOTFIX: avoid placement of unnecessary transient standby tasks

2023-08-09 Thread via GitHub


mjsax commented on code in PR #14149:
URL: https://github.com/apache/kafka/pull/14149#discussion_r1289372985


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -1063,9 +1071,13 @@ private Map> 
buildStandbyTaskMap(final String consum
 }
 
 for (final TaskId task : revokedTasks) {
-if (allStatefulTasks.contains(task)) {

Review Comment:
   > Not sure I fully understand the change here either. Would appreciate some 
more explanations --- specifically, could you give a simple example in the PR 
description?
   
   Also not 100% sure -- I left another comment -- however I believe, we 
current assign standbies out of think air, even into client that should not 
have them, but we only want these standbies for certain client that had such a 
task in the previous assignment. But details are also a little bit unclear to 
me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14149: HOTFIX: avoid placement of unnecessary transient standby tasks

2023-08-09 Thread via GitHub


mjsax commented on code in PR #14149:
URL: https://github.com/apache/kafka/pull/14149#discussion_r1289369350


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -1063,9 +1071,13 @@ private Map> 
buildStandbyTaskMap(final String consum
 }
 
 for (final TaskId task : revokedTasks) {
-if (allStatefulTasks.contains(task)) {

Review Comment:
   > Another noob question, how does standby work with in-memory state store? 
Is it also reading from changelog topic?
   
   Yes.
   
   >  If so, what's the problem of losing state for in-memory state store as it 
can be restored from changelog topic?
   
   The problem is, that we would have to restore from the changelog, and we 
want to avoid this restore. In the end, there is a task that as the full state 
in-memory, and we would close the task, drop the state, and re-create the same 
task and restore. We basically want to avoid the "close and re-create" part. 
(For RocksDB it's not an issue to close a task, because when we re-open it, the 
state is still on disk and restoring from the changelog won't be necessary).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #14164: KAFKA-15022: [8/N] more tests for HAAssignor

2023-08-09 Thread via GitHub


mjsax merged PR #14164:
URL: https://github.com/apache/kafka/pull/14164


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-08-09 Thread via GitHub


junrao commented on PR #13947:
URL: https://github.com/apache/kafka/pull/13947#issuecomment-1672284941

   > I didn't follow previous changes, but I think current implementation also 
works for KRaft mode, right? I can see ReplicaManager#applyDelta is invoking 
stopPartitions, which will do what we expect. Is my understanding correct?
   
   @showuon : It seems the latest PR still doesn't support KRaft. The issue is 
that `ReplicaManager#applyDelta` is invoking a version of `stopPartitions()` 
that always sets `StopPartition.deleteRemoteLog` to false.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-08-09 Thread via GitHub


junrao commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1289212842


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -526,25 +530,37 @@ class ReplicaManager(val config: KafkaConfig,
   /**
* Stop the given partitions.
*
-   * @param partitionsToStopA map from a topic partition to a boolean 
indicating
-   *whether the partition should be deleted.
+   * @param partitionsToStop A map from a topic partition to a boolean 
indicating
+   * whether the partition should be deleted.
+   * @return A map from partitions to exceptions which occurred.
+   * If no errors occurred, the map will be empty.
+   */
+  protected def stopPartitions(partitionsToStop: Map[TopicPartition, 
Boolean]): Map[TopicPartition, Throwable] = {

Review Comment:
   All usage of this method is within this class. Could this be private?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig,
 }
 partitionsToDelete += topicPartition
   }
+  if (stopPartition.deleteRemoteLog)
+remotePartitionsToDelete += topicPartition
+
   // If we were the leader, we may have some operations still waiting for 
completion.
   // We force completion to prevent them from timing out.
   completeDelayedFetchOrProduceRequests(topicPartition)
 }
 
 // Third delete the logs and checkpoint.
 val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] {
+  override def accept(tp: TopicPartition, e: Throwable): Unit = {
+error(s"Error while stopping/deleting the remote log partition: $tp", 
e)
+errorMap.put(tp, e)
+  }
+}
+
 if (partitionsToDelete.nonEmpty) {
   // Delete the logs and checkpoint.
   logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, 
e))
 }
+remoteLogManager.foreach(rlm => {

Review Comment:
   It's simpler to do `remoteLogManager.foreach{rlm => ...}` instead.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig,
 }
 partitionsToDelete += topicPartition
   }
+  if (stopPartition.deleteRemoteLog)
+remotePartitionsToDelete += topicPartition
+
   // If we were the leader, we may have some operations still waiting for 
completion.
   // We force completion to prevent them from timing out.
   completeDelayedFetchOrProduceRequests(topicPartition)
 }
 
 // Third delete the logs and checkpoint.
 val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] {

Review Comment:
   Do we need this handler since it only does logging?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -526,25 +530,37 @@ class ReplicaManager(val config: KafkaConfig,
   /**
* Stop the given partitions.
*
-   * @param partitionsToStopA map from a topic partition to a boolean 
indicating
-   *whether the partition should be deleted.
+   * @param partitionsToStop A map from a topic partition to a boolean 
indicating
+   * whether the partition should be deleted.
+   * @return A map from partitions to exceptions which occurred.
+   * If no errors occurred, the map will be empty.
+   */
+  protected def stopPartitions(partitionsToStop: Map[TopicPartition, 
Boolean]): Map[TopicPartition, Throwable] = {
+stopPartitions(partitionsToStop.map(e => StopPartition(e._1, e._2)).toSet)

Review Comment:
   Could we use map `{ case(tp, deleteLocal) ... }` to avoid unnamed references 
of `._1 `and `._2`?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig,
 }
 partitionsToDelete += topicPartition
   }
+  if (stopPartition.deleteRemoteLog)
+remotePartitionsToDelete += topicPartition
+
   // If we were the leader, we may have some operations still waiting for 
completion.
   // We force completion to prevent them from timing out.
   completeDelayedFetchOrProduceRequests(topicPartition)
 }
 
 // Third delete the logs and checkpoint.
 val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] {
+  override def accept(tp: TopicPartition, e: Throwable): Unit = {
+error(s"Error while stopping/deleting the remote log partition: $tp", 
e)
+errorMap.put(tp, e)
+  }
+}
+
 if (partitionsToDelete.nonEmpty) {
   // Delete the logs and checkpoint.
   logManager.asyncDelete(par

[GitHub] [kafka] lucasbru opened a new pull request, #14180: Kafka Streams Threading: Non-busy wait for processable tasks

2023-08-09 Thread via GitHub


lucasbru opened a new pull request, #14180:
URL: https://github.com/apache/kafka/pull/14180

   Avoid busy waiting for processable tasks. We need to be a bit careful here 
to not have the task executors to sleep when work is available. We have to make 
sure to signal on the condition variable any time a task becomes "processable". 
Here are some situations where a task becomes processable:
   
   - Task is unassigned from another `TaskExecutor`.
   - Task state is changed (should only happen inside when a task is locked 
inside the polling phase).
   - When tasks are unlocked.
   - When tasks are added.
   - New records available.
   - A task is resumed.
   
   So in summary, we
   
   - We should probably lock tasks when they are paused and unlock them when 
they are resumed. We should also wake the task executors after every polling 
phase. This belongs to the StreamThread integration work (separate PR). We add 
`DefaultTaskManager.signalProcessableTasks` for this.
   - We need to awake the task executors in `DefaultTaskManager.unassignTask`, 
`DefaultTaskManager.unlockTasks` and `DefaultTaskManager.add`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee opened a new pull request, #14179: MINOR: CommitRequestManager should only poll when the coordinator node is known

2023-08-09 Thread via GitHub


philipnee opened a new pull request, #14179:
URL: https://github.com/apache/kafka/pull/14179

   As title, we discovered a flaky bug during testing that it would seldomly 
thrown a NOT_COORDINATOR exception, which means the request was routed to a 
non-coordinator node.  We discovered that if we don't check the coordinator 
node in the commitRequestManager, the request manager will pass on an empty 
node to the NetworkClientDelegate, which implies the request can be sent to any 
node in the cluster.  This behavior is incorrect as the commit requests need to 
be routed to a coordinator node.
   
   Because the timing coordinator's discovery during integration testing isn't 
entirely deterministic; therefore, the test became extremely flaky. After 
fixing this: The coordinator node is mandatory before attempt to enqueue these 
commit request to the NetworkClient.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join

2023-08-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752522#comment-17752522
 ] 

Matthias J. Sax commented on KAFKA-14049:
-

I am still not sure if I understand this ticket? – Or is it a duplicate of 
https://issues.apache.org/jira/browse/KAFKA-12317 ?

> Relax Non Null Requirement for KStreamGlobalKTable Left Join
> 
>
> Key: KAFKA-14049
> URL: https://issues.apache.org/jira/browse/KAFKA-14049
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Saumya Gupta
>Assignee: Florin Akermann
>Priority: Major
>
> Null Values in the Stream for a Left Join would indicate a Tombstone Message 
> that needs to propagated if not actually joined with the GlobalKTable 
> message, hence these messages should not be ignored .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14049:

Labels:   (was: kip)

> Relax Non Null Requirement for KStreamGlobalKTable Left Join
> 
>
> Key: KAFKA-14049
> URL: https://issues.apache.org/jira/browse/KAFKA-14049
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Saumya Gupta
>Assignee: Florin Akermann
>Priority: Major
>
> Null Values in the Stream for a Left Join would indicate a Tombstone Message 
> that needs to propagated if not actually joined with the GlobalKTable 
> message, hence these messages should not be ignored .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lihaosky commented on a diff in pull request #14178: KAFKA-15022: [9/N] use RackAwareTaskAssignor in StickyTaskAssignor

2023-08-09 Thread via GitHub


lihaosky commented on code in PR #14178:
URL: https://github.com/apache/kafka/pull/14178#discussion_r1289205241


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java:
##
@@ -617,21 +652,23 @@ static Map>> 
getRandomProcessRacks(final int
 }
 Collections.shuffle(racks);

Review Comment:
   Looks `shuffle` can take random. I think we can use one static random in 
this class



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java:
##
@@ -505,4 +505,15 @@ private void assertNotAssigned(final TaskId task) {
 throw new IllegalArgumentException("Tried to assign task " + task 
+ ", but it is already assigned: " + this);
 }
 }
+
+public ClientState copy() {

Review Comment:
   Make sense. Let me add a copy constructor.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -346,6 +346,7 @@ public long optimizeActiveTasks(final SortedSet 
activeTasks,
 log.info("Assignment before active task optimization is {}\n with cost 
{}", clientStates,
 activeTasksCost(activeTasks, clientStates, trafficCost, 
nonOverlapCost));
 
+final long startTime = System.currentTimeMillis();

Review Comment:
   There's a time in `StreamPartitionAssignor`, we can pass it into 
`RackAwareTaskAssignor` constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14049:

Labels: kip  (was: beginner newbie)

> Relax Non Null Requirement for KStreamGlobalKTable Left Join
> 
>
> Key: KAFKA-14049
> URL: https://issues.apache.org/jira/browse/KAFKA-14049
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Saumya Gupta
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
>
> Null Values in the Stream for a Left Join would indicate a Tombstone Message 
> that needs to propagated if not actually joined with the GlobalKTable 
> message, hence these messages should not be ignored .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14748) Relax non-null FK left-join requirement

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14748:

Description: 
Kafka Streams enforces a strict non-null-key policy in the DSL across all 
key-dependent operations (like aggregations and joins).

This also applies to FK-joins, in particular to the ForeignKeyExtractor. If it 
returns `null`, it's treated as invalid. For left-joins, it might make sense to 
still accept a `null`, and add the left-hand record with an empty 
right-hand-side to the result.

KIP-962: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams]
 

  was:
Kafka Streams enforces a strict non-null-key policy in the DSL across all 
key-dependent operations (like aggregations and joins).

This also applies to FK-joins, in particular to the ForeignKeyExtractor. If it 
returns `null`, it's treated as invalid. For left-joins, it might make sense to 
still accept a `null`, and add the left-hand record with an empty 
right-hand-side to the result.


> Relax non-null FK left-join requirement
> ---
>
> Key: KAFKA-14748
> URL: https://issues.apache.org/jira/browse/KAFKA-14748
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
>
> Kafka Streams enforces a strict non-null-key policy in the DSL across all 
> key-dependent operations (like aggregations and joins).
> This also applies to FK-joins, in particular to the ForeignKeyExtractor. If 
> it returns `null`, it's treated as invalid. For left-joins, it might make 
> sense to still accept a `null`, and add the left-hand record with an empty 
> right-hand-side to the result.
> KIP-962: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14748) Relax non-null FK left-join requirement

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14748:

Labels: kip  (was: )

> Relax non-null FK left-join requirement
> ---
>
> Key: KAFKA-14748
> URL: https://issues.apache.org/jira/browse/KAFKA-14748
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
>
> Kafka Streams enforces a strict non-null-key policy in the DSL across all 
> key-dependent operations (like aggregations and joins).
> This also applies to FK-joins, in particular to the ForeignKeyExtractor. If 
> it returns `null`, it's treated as invalid. For left-joins, it might make 
> sense to still accept a `null`, and add the left-hand record with an empty 
> right-hand-side to the result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12317:

Description: 
Currently, for a stream-streams and stream-table/globalTable join KafkaStreams 
drops all stream records with a `null`{-}key (`null`-join-key for 
stream-globalTable), because for a `null`{-}(join)key the join is undefined: 
ie, we don't have an attribute the do the table lookup (we consider the 
stream-record as malformed). Note, that we define the semantics of _left/outer_ 
join as: keep the stream record if no matching join record was found.

We could relax the definition of _left_ stream-table/globalTable and 
_left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
records, and call the ValueJoiner with a `null` "other-side" value instead: if 
the stream record key (or join-key) is `null`, we could treat is as "failed 
lookup" instead of treating the stream record as corrupted.

If we make this change, users that want to keep the current behavior, can add a 
`filter()` before the join to drop `null`-(join)key records from the stream 
explicitly.

Note that this change also requires to change the behavior if we insert a 
repartition topic before the join: currently, we drop `null`-key record before 
writing into the repartition topic (as we know they would be dropped later 
anyway). We need to relax this behavior for a left stream-table and left/outer 
stream-stream join. User need to be aware (ie, we might need to put this into 
the docs and JavaDocs), that records with `null`-key would be partitioned 
randomly.

KIP-962: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams]
 

  was:
Currently, for a stream-streams and stream-table/globalTable join KafkaStreams 
drops all stream records with a `null`-key (`null`-join-key for 
stream-globalTable), because for a `null`-(join)key the join is undefined: ie, 
we don't have an attribute the do the table lookup (we consider the 
stream-record as malformed). Note, that we define the semantics of _left/outer_ 
join as: keep the stream record if no matching join record was found.

We could relax the definition of _left_ stream-table/globalTable and 
_left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
records, and call the ValueJoiner with a `null` "other-side" value instead: if 
the stream record key (or join-key) is `null`, we could treat is as "failed 
lookup" instead of treating the stream record as corrupted.

If we make this change, users that want to keep the current behavior, can add a 
`filter()` before the join to drop `null`-(join)key records from the stream 
explicitly.

Note that this change also requires to change the behavior if we insert a 
repartition topic before the join: currently, we drop `null`-key record before 
writing into the repartition topic (as we know they would be dropped later 
anyway). We need to relax this behavior for a left stream-table and left/outer 
stream-stream join. User need to be aware (ie, we might need to put this into 
the docs and JavaDocs), that records with `null`-key would be partitioned 
randomly.


> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`{-}key (`null`-join-key 
> for stream-globalTable), because for a `null`{-}(join)key the join is 
> undefined: ie, we don't have an attribute the do the table lookup (we 
> consider the stream-record as malformed). Note, that we define the semantics 
> of _left/outer_ join as: keep the stream record if no matching join record 
> was found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyw

[jira] [Updated] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12317:

Labels: kip  (was: )

> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`-key (`null`-join-key for 
> stream-globalTable), because for a `null`-(join)key the join is undefined: 
> ie, we don't have an attribute the do the table lookup (we consider the 
> stream-record as malformed). Note, that we define the semantics of 
> _left/outer_ join as: keep the stream record if no matching join record was 
> found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyway). We need to relax this behavior for a left stream-table and 
> left/outer stream-stream join. User need to be aware (ie, we might need to 
> put this into the docs and JavaDocs), that records with `null`-key would be 
> partitioned randomly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2023-08-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752519#comment-17752519
 ] 

Matthias J. Sax commented on KAFKA-12317:
-

Just catching on of comments...
{quote}KAFKA-12845 is in status resolved so I assume this one is no longer 
relevant.
{quote}
Yes, it was closes as a duplicate of this ticket. Ie, we should make the change 
not just for left stream-table, but also for left stream-globalKTable.

Thanks [~guozhang]; I also prefer a KIP.

Thanks [~aki] for the KIP and PR.

> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`-key (`null`-join-key for 
> stream-globalTable), because for a `null`-(join)key the join is undefined: 
> ie, we don't have an attribute the do the table lookup (we consider the 
> stream-record as malformed). Note, that we define the semantics of 
> _left/outer_ join as: keep the stream record if no matching join record was 
> found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyway). We need to relax this behavior for a left stream-table and 
> left/outer stream-stream join. User need to be aware (ie, we might need to 
> put this into the docs and JavaDocs), that records with `null`-key would be 
> partitioned randomly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] aindriu-aiven commented on a diff in pull request #14159: Kafka 15291 connect plugins implement versioned

2023-08-09 Thread via GitHub


aindriu-aiven commented on code in PR #14159:
URL: https://github.com/apache/kafka/pull/14159#discussion_r1289206301


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -145,10 +148,28 @@ public void testScanningMixOfValidAndInvalidPlugins() 
throws Exception {
 assertEquals(expectedClasses, classes);
 }
 
+@Test
+public void testScannedPluingsForUndefinedVersion() {
+PluginScanResult unversionedPluginsResult = 
scan(filterPluginsResourceDir(TestPlugins.pluginPath(), 
TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER.resourceDir()));

Review Comment:
   Thanks Manually committed to add TestPlugins dependency workaround



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -145,10 +148,28 @@ public void testScanningMixOfValidAndInvalidPlugins() 
throws Exception {
 assertEquals(expectedClasses, classes);
 }
 
+@Test
+public void testScannedPluingsForUndefinedVersion() {
+PluginScanResult unversionedPluginsResult = 
scan(filterPluginsResourceDir(TestPlugins.pluginPath(), 
TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER.resourceDir()));
+assertFalse(unversionedPluginsResult.isEmpty());
+unversionedPluginsResult.forEach(pluginDesc -> 
assertEquals(PluginDesc.UNDEFINED_VERSION, pluginDesc.version()));
+}
+
+@Test
+public void testScannedPluingsForVersion() {
+PluginScanResult versionedPluginResult = 
scan(filterPluginsResourceDir(TestPlugins.pluginPath(), 
TestPlugins.TestPlugin.READ_VERSION_FROM_RESOURCE_V1.resourceDir()));

Review Comment:
   Thanks Manually committed to add TestPlugins dependency workaround



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14178: KAFKA-15022: [9/N] use RackAwareTaskAssignor in StickyTaskAssignor

2023-08-09 Thread via GitHub


mjsax commented on code in PR #14178:
URL: https://github.com/apache/kafka/pull/14178#discussion_r1289147669


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java:
##
@@ -505,4 +505,15 @@ private void assertNotAssigned(final TaskId task) {
 throw new IllegalArgumentException("Tried to assign task " + task 
+ ", but it is already assigned: " + this);
 }
 }
+
+public ClientState copy() {

Review Comment:
   I think we usually use a copy-constructor for such cases.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java:
##
@@ -36,11 +41,18 @@
 public class StickyTaskAssignor implements TaskAssignor {
 
 private static final Logger log = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1;
+private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 10;
+private static final int STATELESS_TRAFFIC_COST = 1;
+private static final int STATELESS_NON_OVERLAP_COST = 0;

Review Comment:
   Seems better to make the vars in the prod code accessible instead of 
duplicating them?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -346,6 +346,7 @@ public long optimizeActiveTasks(final SortedSet 
activeTasks,
 log.info("Assignment before active task optimization is {}\n with cost 
{}", clientStates,
 activeTasksCost(activeTasks, clientStates, trafficCost, 
nonOverlapCost));
 
+final long startTime = System.currentTimeMillis();

Review Comment:
   We should avoid calling `System.currentTimeMillis()` directly, but use the 
central `Time` object instead. Not sure how complex it would be to get a handle 
on it? (also below)



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java:
##
@@ -617,21 +652,23 @@ static Map>> 
getRandomProcessRacks(final int
 }
 Collections.shuffle(racks);

Review Comment:
   Just realizing that we might want to hand in a `Random` object as second 
parameter to allow us to reproduce a test run? Can you check the code for other 
places where we use `shuffle` to allow us to improve it across the board?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14831) Illegal state errors should be fatal in transactional producer

2023-08-09 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-14831.
---
Resolution: Fixed

> Illegal state errors should be fatal in transactional producer
> --
>
> Key: KAFKA-14831
> URL: https://issues.apache.org/jira/browse/KAFKA-14831
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.2.3
>Reporter: Jason Gustafson
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.6.0
>
>
> In KAFKA-14830, the producer hit an illegal state error. The error was 
> propagated to the {{Sender}} thread and logged, but the producer otherwise 
> continued on. It would be better to make illegal state errors fatal since 
> continuing to write to transactions when the internal state is inconsistent 
> may cause incorrect and unpredictable behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-09 Thread via GitHub


mumrah commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1289141277


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1498,6 +1543,30 @@ private void cancelNextWriteNoOpRecord() {
 queue.cancelDeferred(WRITE_NO_OP_RECORD);
 }
 
+private static final String WRITE_REMOVE_DELEGATIONTOKEN_RECORD = 
"writeRemoveDelegationTokenRecord";

Review Comment:
   I think should be something like "maybeExpireDelegationTokens" or something 
to reflect what this event is doing (rather than what record ends up being used)



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -176,8 +176,11 @@ public enum MetadataVersion {
 // Support for SCRAM
 IBP_3_5_IV2(11, "3.5", "IV2", true),
 
-// Remove leader epoch bump when KRaft controller shrinks the ISR 
(KAFKA-15021)
-IBP_3_6_IV0(12, "3.6", "IV0", false);
+// Support for Remove leader epoch bump when KRaft controller shrinks the 
ISR (KAFKA-15021)
+IBP_3_6_IV0(12, "3.6", "IV0", false),
+
+// Support for DelegationTokens

Review Comment:
   nit: "KRaft support for DelegationTokens"



##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -633,4 +641,35 @@ void handleAclsDelta(AclsImage image, AclsDelta delta, 
KRaftMigrationOperationCo
 migrationClient.aclClient().writeResourceAcls(resourcePattern, 
accessControlEntries, migrationState));
 });
 }
+
+void handleDelegationTokenDelta(DelegationTokenImage image, 
DelegationTokenDelta delta, KRaftMigrationOperationConsumer operationConsumer) {
+Set updatedResources = delta.changes().keySet();

Review Comment:
   nit: "updatedTokens" ?



##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+pri

[jira] [Commented] (KAFKA-15224) Automate version change to snapshot

2023-08-09 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752510#comment-17752510
 ] 

Divij Vaidya commented on KAFKA-15224:
--

> can we include new python packages?

Sure. Currently the expectation if for release manager to manually set up the 
dependencies in the machine that they will use to build the release. You will 
find the release process here and observe manual instructions such as  "Make 
sure you're running the script with Python3" or install "$pip3 install jira". 
Please feel free to add requirements.txt, in fact, that is I believe the 
preferred approach. Perhaps create a release folder and move release.py, 
requirements.txt and other new python scripts that we may create in that folder.

> in the example PR, there are files with version bump without `SNAPSHOT` 
> suffix. Would this script update files with `\{version}-SNAPSHOT` always

No, not always. As an example you will notice that we don't add "SNAPSHOT" 
suffix to fullDotVersion.js file. We run the set of commands in this PR when 
release is done for existing version and we want to prepare branch for next 
version (hence calling it snapshot). Note that we do inverse of this operations 
(removing snapshot) at 
[https://github.com/apache/kafka/blob/trunk/release.py#L559-L577.] We do the 
inverse prior to the release.

Does these answer your questions?

> Automate version change to snapshot 
> 
>
> Key: KAFKA-15224
> URL: https://issues.apache.org/jira/browse/KAFKA-15224
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Tanay Karmarkar
>Priority: Minor
>
> We require changing to SNAPSHOT version as part of the release process [1]. 
> The specific manual steps are:
> Update version on the branch to 0.10.0.1-SNAPSHOT in the following places:
>  * 
>  ** docs/js/templateData.js
>  ** gradle.properties
>  ** kafka-merge-pr.py
>  ** streams/quickstart/java/pom.xml
>  ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
>  ** streams/quickstart/pom.xml
>  ** tests/kafkatest/_{_}init{_}_.py (note: this version name can't follow the 
> -SNAPSHOT convention due to python version naming restrictions, instead
> update it to 0.10.0.1.dev0)
>  ** tests/kafkatest/version.py
> The diff of the changes look like 
> [https://github.com/apache/kafka/commit/484a86feb562f645bdbec74b18f8a28395a686f7#diff-21a0ab11b8bbdab9930ad18d4bca2d943bbdf40d29d68ab8a96f765bd1f9]
>  
>  
> It would be nice if we could run a script to automatically do it. Note that 
> release.py (line 550) already does something similar where it replaces 
> SNAPSHOT with actual version. We need to do the opposite here. We can 
> repurpose that code in release.py and extract into a new script to perform 
> this opertaion.
> [1] [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lihaosky opened a new pull request, #14178: KAFKA-15022: [9/N] use RackAwareTaskAssignor in StickyTaskAssignor

2023-08-09 Thread via GitHub


lihaosky opened a new pull request, #14178:
URL: https://github.com/apache/kafka/pull/14178

   ## Description
   Use rack aware assignor in `StickyTaskAssignor`. This PR is on top of 
https://github.com/apache/kafka/pull/14164.
   
   ## Test
   Update existing unit test
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15224) Automate version change to snapshot

2023-08-09 Thread Tanay Karmarkar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752504#comment-17752504
 ] 

Tanay Karmarkar edited comment on KAFKA-15224 at 8/9/23 7:46 PM:
-

[~divijvaidya] Another clarification needed, in the example PR, there are files 
with version bump without `SNAPSHOT` suffix. Would this script update files 
with `\{version}-SNAPSHOT` always?


was (Author: JIRAUSER301398):
Another clarification needed, in the example PR, there are files with version 
bump without `SNAPSHOT` suffix. Would this script update files with 
`\{version}-SNAPSHOT` always?

> Automate version change to snapshot 
> 
>
> Key: KAFKA-15224
> URL: https://issues.apache.org/jira/browse/KAFKA-15224
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Tanay Karmarkar
>Priority: Minor
>
> We require changing to SNAPSHOT version as part of the release process [1]. 
> The specific manual steps are:
> Update version on the branch to 0.10.0.1-SNAPSHOT in the following places:
>  * 
>  ** docs/js/templateData.js
>  ** gradle.properties
>  ** kafka-merge-pr.py
>  ** streams/quickstart/java/pom.xml
>  ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
>  ** streams/quickstart/pom.xml
>  ** tests/kafkatest/_{_}init{_}_.py (note: this version name can't follow the 
> -SNAPSHOT convention due to python version naming restrictions, instead
> update it to 0.10.0.1.dev0)
>  ** tests/kafkatest/version.py
> The diff of the changes look like 
> [https://github.com/apache/kafka/commit/484a86feb562f645bdbec74b18f8a28395a686f7#diff-21a0ab11b8bbdab9930ad18d4bca2d943bbdf40d29d68ab8a96f765bd1f9]
>  
>  
> It would be nice if we could run a script to automatically do it. Note that 
> release.py (line 550) already does something similar where it replaces 
> SNAPSHOT with actual version. We need to do the opposite here. We can 
> repurpose that code in release.py and extract into a new script to perform 
> this opertaion.
> [1] [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15224) Automate version change to snapshot

2023-08-09 Thread Tanay Karmarkar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752504#comment-17752504
 ] 

Tanay Karmarkar commented on KAFKA-15224:
-

Another clarification needed, in the example PR, there are files with version 
bump without `SNAPSHOT` suffix. Would this script update files with 
`\{version}-SNAPSHOT` always?

> Automate version change to snapshot 
> 
>
> Key: KAFKA-15224
> URL: https://issues.apache.org/jira/browse/KAFKA-15224
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Tanay Karmarkar
>Priority: Minor
>
> We require changing to SNAPSHOT version as part of the release process [1]. 
> The specific manual steps are:
> Update version on the branch to 0.10.0.1-SNAPSHOT in the following places:
>  * 
>  ** docs/js/templateData.js
>  ** gradle.properties
>  ** kafka-merge-pr.py
>  ** streams/quickstart/java/pom.xml
>  ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
>  ** streams/quickstart/pom.xml
>  ** tests/kafkatest/_{_}init{_}_.py (note: this version name can't follow the 
> -SNAPSHOT convention due to python version naming restrictions, instead
> update it to 0.10.0.1.dev0)
>  ** tests/kafkatest/version.py
> The diff of the changes look like 
> [https://github.com/apache/kafka/commit/484a86feb562f645bdbec74b18f8a28395a686f7#diff-21a0ab11b8bbdab9930ad18d4bca2d943bbdf40d29d68ab8a96f765bd1f9]
>  
>  
> It would be nice if we could run a script to automatically do it. Note that 
> release.py (line 550) already does something similar where it replaces 
> SNAPSHOT with actual version. We need to do the opposite here. We can 
> repurpose that code in release.py and extract into a new script to perform 
> this opertaion.
> [1] [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] omkreddy commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-09 Thread via GitHub


omkreddy commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1289107179


##
core/src/main/scala/kafka/server/DelegationTokenManager.scala:
##
@@ -165,10 +112,11 @@ object DelegationTokenManager {
 
 class DelegationTokenManager(val config: KafkaConfig,
  val tokenCache: DelegationTokenCache,
- val time: Time,
- val zkClient: KafkaZkClient) extends Logging {
+ val time: Time) extends Logging {
   this.logIdent = s"[Token Manager on Broker ${config.brokerId}]: "
 
+  protected val lock = new Object()
+

Review Comment:
   Cab we remove unused DescribeResponseCallback type variable at line no:125



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15042) Clarify documentation on Tagged fields

2023-08-09 Thread Adrian Preston (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752503#comment-17752503
 ] 

Adrian Preston commented on KAFKA-15042:


[~smashingquasar],

Looking at the hex dump of your APIVersions request, I think you are correctly 
encoding the empty tag field as a single 0x00 byte. It looks, however, like the 
compact string encoding (used for the 'client_software_name' and 
'client_software_version' fields) is adding an unexpected 0x01 as the first 
byte.

For comparison, here's the APIVersions request I get from Sarama:

{{00 00 00 21    (Length)}}
{{00 12  (API key)}}
{{00 03  (API version)}}
{{00 00 00 00    (Correlation ID)}}
{{00 07 77 65 62 2d 61 70 69 (Client ID)}}
{{00 (Tagged fields)}}
{{08 77 65 62 2d 61 70 69    (Client software name)}}
{{06 30 2e 30 2e 31  (Client software version)}}
{{00 (Tagged fields)}}

> Clarify documentation on Tagged fields
> --
>
> Key: KAFKA-15042
> URL: https://issues.apache.org/jira/browse/KAFKA-15042
> Project: Kafka
>  Issue Type: Wish
>  Components: docs
> Environment: Using the Ubuntu/Kafka Docker image for testing purposes.
>Reporter: Nicolas
>Assignee: Adrian Preston
>Priority: Major
>
> Hello,
> I am currently working on an implementation of the Kafka protocol.
> So far, all my code is working as intended through serialising requests and 
> deserialising response as long as I am not using the flex requests system.
> I am now trying to implement the flex requests system but the documentation 
> is scarce on the subject of tagged fields.
> If we take the Request Header v2:
>  
> {code:java}
> Request Header v2 => request_api_key request_api_version correlation_id 
> client_id TAG_BUFFER request_api_key => INT16 request_api_version => INT16 
> correlation_id => INT32 client_id => NULLABLE_STRING{code}
>  
> Here, the BNF seems violated. TAG_BUFFER is not a value in this situation. It 
> appears to be a type. It also does not appear within the detailed description 
> inside the BNF.
> TAG_BUFFER also does not refer to any declared type within the documentation. 
> It seems to indicate tagged fields though.
> Now when looking at tagged fields, the only mention of them within the 
> documentation is:
> {quote}Note that [KIP-482 tagged 
> fields|https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields]
>  can be added to a request without incrementing the version number. This 
> offers an additional way of evolving the message schema without breaking 
> compatibility. Tagged fields do not take up any space when the field is not 
> set. Therefore, if a field is rarely used, it is more efficient to make it a 
> tagged field than to put it in the mandatory schema. However, tagged fields 
> are ignored by recipients that don't know about them, which could pose a 
> challenge if this is not the behavior that the sender wants. In such cases, a 
> version bump may be more appropriate.
> {quote}
> This leads to the KIP-482 that does not clearly and explicitly detail the 
> process of writing and reading those tagged fields.
> I decided to look up existing clients to understand how they handle tagged 
> fields. I notably looked at kafka-js (JavaScript client) and librdkafka (C 
> client) and to my surprise, they have not implemented tagged fields. In fact, 
> they rely on a hack to skip them and ignore them completely.
> I also had a look at the [Java client bundled within Kafka within the Tagged 
> Fields 
> section|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java#L64].
> Now I am not a Java developer so I may not understand exactly this code. I 
> read the comment and implemented the logic following Google's Protobuf 
> specifications. The problem is, this leads to a request that outputs a stack 
> trace within Kafka (it would be appreciated to not just dump stack traces and 
> gracefully handle errors by the way).
>  
> As a reference, I tried to send an APIVersions (key: 18) (version: 3) request.
> My request reads as follows when converted to hexadecimal:
>  
> {code:java}
> Request header: 00 12 00 03 00 00 00 00 00 07 77 65 62 2d 61 70 69 00
> Request body: 01 08 77 65 62 2d 61 70 69 01 06 30 2e 30 2e 31 00
> Full request: 00 00 00 23 00 12 00 03 00 00 00 00 00 07 77 65 62 2d 61 70 69 
> 00 01 08 77 65 62 2d 61 70 69 01 06 30 2e 30 2e 31 00
> {code}
>  
> This creates a buffer underflow error within Kafka:
> {code:java}
> [2023-05-31 14:14:31,132] ERROR Exception while processing request from 
> 172.21.0.5:9092-172.21.0.3:59228-21 (kafka.network.Processor

[GitHub] [kafka] omkreddy commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-09 Thread via GitHub


omkreddy commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1289096459


##
core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala:
##
@@ -145,12 +163,12 @@ class DelegationTokenEndToEndAuthorizationTest extends 
EndToEndAuthorizationTest
   val privilegedAdminClient = 
createScramAdminClient(kafkaClientSaslMechanism, kafkaPrincipal.getName, 
kafkaPassword)
   try {
 val token = 
adminClient.createDelegationToken(createDelegationTokenOptions()).delegationToken().get()
-if (assert) {
-  assertToken(token)
-}
+//if (assert) {

Review Comment:
   Can we enable this code



##
core/src/main/scala/kafka/server/MetadataSupport.scala:
##
@@ -69,6 +69,16 @@ sealed trait MetadataSupport {
   handler(request)
 }
   }
+
+  def alsoMaybeForward(request: RequestChannel.Request,

Review Comment:
   looks unused



##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+private long tokenRemoverScanInterval = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+

[GitHub] [kafka] gharris1727 commented on a diff in pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced

2023-08-09 Thread via GitHub


gharris1727 commented on code in PR #14156:
URL: https://github.com/apache/kafka/pull/14156#discussion_r1289079347


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -227,7 +227,7 @@ private void clearSyncArray(OffsetSync[] syncs, OffsetSync 
offsetSync) {
 }
 }
 
-private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync[] original, 
OffsetSync offsetSync) {

Review Comment:
   That is already the signature of the caller, so this would amount to 
inlining this function into `updateExistingSyncs` to combine the two.
   
   I think that this function is already large enough, and doesn't also need to 
be concerned with copying the array or logging the result. I also think it 
makes batching updates simpler: The caller can manage the lifetimes of the 
arrays and their mutability, while the inner function can be concerned with 
just applying the updates.
   
   Also this code was doing that already, here and in the `clearSyncArray` 
function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15327) Async consumer should commit offsets on close

2023-08-09 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15327:
--

 Summary: Async consumer should commit offsets on close
 Key: KAFKA-15327
 URL: https://issues.apache.org/jira/browse/KAFKA-15327
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Reporter: Lianet Magrans


In the current implementation of the KafkaConsumer, the ConsumerCoordinator 
commits offsets before the consumer is closed, with a call to 
maybeAutoCommitOffsetsSync(timer);
The async consumer should provide the same behaviour to commit offsets on 
close. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on a diff in pull request #14159: Kafka 15291 connect plugins implement versioned

2023-08-09 Thread via GitHub


gharris1727 commented on code in PR #14159:
URL: https://github.com/apache/kafka/pull/14159#discussion_r1289026069


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -145,10 +148,28 @@ public void testScanningMixOfValidAndInvalidPlugins() 
throws Exception {
 assertEquals(expectedClasses, classes);
 }
 
+@Test
+public void testScannedPluingsForUndefinedVersion() {
+PluginScanResult unversionedPluginsResult = 
scan(filterPluginsResourceDir(TestPlugins.pluginPath(), 
TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER.resourceDir()));

Review Comment:
   There's a utility method for assembling a plugin path of just select 
plugins, you don't need to do the filtering yourself :)
   ```suggestion
   PluginScanResult unversionedPluginsResult = 
scan(TestPlugins.pluginPath(TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER));
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -145,10 +148,28 @@ public void testScanningMixOfValidAndInvalidPlugins() 
throws Exception {
 assertEquals(expectedClasses, classes);
 }
 
+@Test
+public void testScannedPluingsForUndefinedVersion() {
+PluginScanResult unversionedPluginsResult = 
scan(filterPluginsResourceDir(TestPlugins.pluginPath(), 
TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER.resourceDir()));
+assertFalse(unversionedPluginsResult.isEmpty());
+unversionedPluginsResult.forEach(pluginDesc -> 
assertEquals(PluginDesc.UNDEFINED_VERSION, pluginDesc.version()));
+}
+
+@Test
+public void testScannedPluingsForVersion() {

Review Comment:
   ```suggestion
   public void testVersionedPluginsHasVersion() {
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -145,10 +148,28 @@ public void testScanningMixOfValidAndInvalidPlugins() 
throws Exception {
 assertEquals(expectedClasses, classes);
 }
 
+@Test
+public void testScannedPluingsForUndefinedVersion() {
+PluginScanResult unversionedPluginsResult = 
scan(filterPluginsResourceDir(TestPlugins.pluginPath(), 
TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER.resourceDir()));
+assertFalse(unversionedPluginsResult.isEmpty());
+unversionedPluginsResult.forEach(pluginDesc -> 
assertEquals(PluginDesc.UNDEFINED_VERSION, pluginDesc.version()));
+}
+
+@Test
+public void testScannedPluingsForVersion() {
+PluginScanResult versionedPluginResult = 
scan(filterPluginsResourceDir(TestPlugins.pluginPath(), 
TestPlugins.TestPlugin.READ_VERSION_FROM_RESOURCE_V1.resourceDir()));

Review Comment:
   ```suggestion
   PluginScanResult versionedPluginResult = 
scan(TestPlugins.pluginPath(TestPlugins.TestPlugin.READ_VERSION_FROM_RESOURCE_V1));
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -145,10 +148,28 @@ public void testScanningMixOfValidAndInvalidPlugins() 
throws Exception {
 assertEquals(expectedClasses, classes);
 }
 
+@Test
+public void testScannedPluingsForUndefinedVersion() {

Review Comment:
   ```suggestion
   public void testNonVersionedPluginHasUndefinedVersion() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #14177: MINOR: Fix SynchronizationTest Classloaders sometimes not being parallel capable

2023-08-09 Thread via GitHub


gharris1727 opened a new pull request, #14177:
URL: https://github.com/apache/kafka/pull/14177

   The `Classloader.registerAsParallelCapable()` call is intended to be called 
during static initialization. It must be called prior to the `Classloader` 
superconstructor, because there it is evaluated to decide whether the 
classloader instance created will be parallel capable.
   
   For the entire lifetime of this test, the 
`Classloader.registerAsParallelCapable()` call has been in an instance 
initializer, which is executed after the superconstructor is finished. This has 
meant that the first `SynchronizedDelegatingClassLoader`, and the first 
`SynchronizedPluginClassLoader` created have erroneously been 
non-parallel-capable.
   
   However, this test did not flaky-fail until #14089 was merged, because the 
`SamplingConverter` was never located in the first 
`SynchronizedPluginClassLoader`. With that PR, the 
`TestPlugins.pluginPath(TestPlugins.TestPlugin...)` function was changed to 
return a Set instead of a List. This meant that the `SamplingConverter` then 
_could_ appear in the first `SynchronizedPluginClassLoader`, and whenever that 
happened, the test would reproduce a deadlock and fail.
   
   Since there was only one `SynchronizedDelegatingClassLoader` created in this 
test, it was always non-parallel-capable. The test would only deadlock if 
_both_ classloaders were non-parallel-capable, which was not possible until 
recently.
   
   Importantly, this is only a bug in the test, as the real PluginClassLoader 
and DelegatingClassLoader have been parallel-capable since they were fixed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-09 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1288946333


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -144,13 +159,25 @@ public long metadataExpireMs() {
  */
 public synchronized int requestUpdate() {
 this.needFullUpdate = true;
+this.backoffUpdateRequests = 0L;

Review Comment:
   > My understanding of the PR is this, metadata request won't backoff, but 
produce request would backoff. So likely metadata is going to be updated next 
time around produce request is retried(post backoff).
   
   @msn-tldr : To me, the common reason why a produce request needs to backoff 
is that the metadata is stable since the latest metadata hasn't been propagated 
to the brokers yet. So, if we don't backoff the metadata request, the returned 
metadata may still be stale, which won't help the backed off produce request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-09 Thread via GitHub


divijvaidya commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1288939068


##
core/src/main/scala/kafka/server/ConfigHandler.scala:
##
@@ -62,6 +62,12 @@ class TopicConfigHandler(private val logManager: LogManager, 
kafkaConfig: KafkaC
 topicConfig.asScala.forKeyValue { (key, value) =>
   if (!configNamesToExclude.contains(key)) props.put(key, value)
 }
+
+if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()

Review Comment:
   @clolov could you please check if we call some form of LogConfig.validate() 
on startup? (I think we do when we create LogManager). In which case you may 
want to bring the broker level config to LogConfig like this: 
https://github.com/apache/kafka/pull/14176 and add you new validation in 
LogManager.validate() similar to how we are validating in other PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-09 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1288894765


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -459,49 +463,53 @@ public static void validateValues(Map props) {
 long maxCompactionLag = (Long) 
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
 if (minCompactionLag > maxCompactionLag) {
 throw new InvalidConfigurationException("conflict topic config 
setting "
-+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
-+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
++ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
++ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
 }
+}
 
-if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
-boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
-String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
-if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
-throw new ConfigException("Remote log storage is unsupported 
for the compacted topics");
-}
+public static void validateValuesInBroker(Map props) {
+validateValues(props);
+Boolean isRemoteLogStorageSystemEnabled =
+(Boolean) 
props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
+Boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
+if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) {
+throw new ConfigException("Tiered Storage functionality is 
disabled in the broker. " +
+"Topic cannot be configured with remote log storage.");
 }
 
-if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) {
-Long retentionBytes = (Long) 
props.get(TopicConfig.RETENTION_BYTES_CONFIG);
-Long localLogRetentionBytes = (Long) 
props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
-if (retentionBytes > -1 && localLogRetentionBytes != -2) {
-if (localLogRetentionBytes == -1) {
-String message = String.format("Value must not be -1 as %s 
value is set as %d.",
-TopicConfig.RETENTION_BYTES_CONFIG, 
retentionBytes);
-throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
-}
-if (localLogRetentionBytes > retentionBytes) {
-String message = String.format("Value must not be more 
than %s property value: %d",
-TopicConfig.RETENTION_BYTES_CONFIG, 
retentionBytes);
-throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
-}
+String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
+if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
+throw new ConfigException("Remote log storage is unsupported for 
the compacted topics");
+}

Review Comment:
   could be a separate function - `validateNoRemoteStorageForCompactedTopic`



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -459,49 +463,53 @@ public static void validateValues(Map props) {
 long maxCompactionLag = (Long) 
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
 if (minCompactionLag > maxCompactionLag) {
 throw new InvalidConfigurationException("conflict topic config 
setting "
-+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
-+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
++ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
++ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
 }
+}
 
-if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
-boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
-String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
-if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
-throw new ConfigException("Remote log storage is unsupported 
for the c

[GitHub] [kafka] junrao commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-09 Thread via GitHub


junrao commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1287764297


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -956,6 +981,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 }
   }
 
+  private def maybeIncrementLocalLogStartOffset(newLocalLogStartOffset: Long, 
reason: LogStartOffsetIncrementReason): Unit = {
+lock synchronized {
+  if (newLocalLogStartOffset > localLogStartOffset()) {
+_localLogStartOffset = math.max(newLocalLogStartOffset, 
localLogStartOffset());

Review Comment:
   Since newLocalLogStartOffset is larger than localLogStartOffset(), could we 
just assign newLocalLogStartOffset to _localLogStartOffset?



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -966,7 +1000,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* @throws OffsetOutOfRangeException if the log start offset is greater than 
the high watermark
* @return true if the log start offset was updated; otherwise false
*/
-  def maybeIncrementLogStartOffset(newLogStartOffset: Long, reason: 
LogStartOffsetIncrementReason): Boolean = {
+  def maybeIncrementLogStartOffset(newLogStartOffset: Long,
+   reason: 
LogStartOffsetIncrementReason): Boolean = {

Review Comment:
   Identation doesn't match other places in this file.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+  // Check not to delete segments which are not yet copied to tiered 
storage
+  val isSegmentTieredToRemoteStorage =

Review Comment:
   Here is a corner case. Let's say remote log is enabled, but there is no 
remote segment (all have been deleted due to retention). The new logic will do 
retention based on `localRetentionBytes`, but it should actually do the 
retention based on `retentionSize`. If that happens, we need to advance 
logStartOffset, in addition to localLogStartOffset.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1464,12 +1513,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 }
 
-deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
+deleteOldSegments(shouldDelete, RetentionSizeBreach(this, 
remoteLogEnabled()))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
+  nextSegmentOpt.exists(_.baseOffset <= localLogStartOffset())

Review Comment:
   This doesn't look right. If remote log is not enabled, it seems that we 
should delete based on logStartOffset, not localLogStartOffset.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+  // Check not to delete segments which are not yet copied to tiered 
storage
+  val isSegmentTieredToRemoteStorage =
+if (remoteLogEnabled()) upperBoundOffset > 0 && upperBoundOffset - 1 
<= highestOffsetInRemoteStorage
+else true

Review Comment:
   Hmm, this should be false, right? Do we have a test case to cover that?



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -147,11 +147,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   def localLogStartOffset(): Long = _localLogStartOffset
 
+  // This is the offset(inclusive) until which segments are copied to the 
remote storage.
   @volatile private var highestOffsetInRemoteStorage: Long = -1L
 
   locally {
 initializePartitionMetadata()
 updateLogStartOffset(logStartOffset)
+updateLocalLogStartOffset(math.max(logStartOffset, 
localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))

Review Comment:
   This is an existing issue. But there is one direct reference to 
`_localLogStartOffset` in `fetchOffsetByTimestamp()`. Should we change that to 
use `localLogStartOffset()` instead?



###

[GitHub] [kafka] C0urante commented on a diff in pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced

2023-08-09 Thread via GitHub


C0urante commented on code in PR #14156:
URL: https://github.com/apache/kafka/pull/14156#discussion_r1287870397


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -227,7 +227,7 @@ private void clearSyncArray(OffsetSync[] syncs, OffsetSync 
offsetSync) {
 }
 }
 
-private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync[] original, 
OffsetSync offsetSync) {

Review Comment:
   Nit: feels a little strange that we're passing in a to-be-mutated sync 
array. Any reason not to alter `updateSyncArray` to only take in the original 
sync array and the new sync, and construct and return the new sync array?



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -153,31 +156,84 @@ public void testPastOffsetTranslation() {
 }
 
 @Test
-public void testKeepMostDistinctSyncs() {
+public void testConsistentlySpacedSyncs() {
 // Under normal operation, the incoming syncs will be regularly spaced 
and the store should keep a set of syncs
 // which provide the best translation accuracy (expires as few syncs 
as possible)
-// Each new sync should be added to the cache and expire at most one 
other sync from the cache
-long iterations = 1;
+long iterations = 100;
 long maxStep = Long.MAX_VALUE / iterations;
 // Test a variety of steps (corresponding to the offset.lag.max 
configuration)
 for (long step = 1; step < maxStep; step = (step * 2) + 1)  {
 for (long firstOffset = 0; firstOffset < 30; firstOffset++) {
-try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
-int lastCount = 1;
-store.start();
-for (long offset = firstOffset; offset <= iterations; 
offset += step) {
-store.sync(tp, offset, offset);
-// Invariant A: the latest sync is present
-assertEquals(offset, store.syncFor(tp, 
0).upstreamOffset());
-// Invariant D: the earliest sync is present
-assertEquals(firstOffset, store.syncFor(tp, 
63).upstreamOffset());
-int count = countDistinctStoredSyncs(store, tp);
-int diff = count - lastCount;
-assertTrue(diff >= 0,
-"Store expired too many syncs: " + diff + " 
after receiving offset " + offset);
-lastCount = count;
-}
-}
+long finalStep = step;
+// Generate a stream of consistently spaced syncs
+// Each new sync should be added to the cache and expire at 
most one other sync from the cache
+assertSyncSpacingHasBoundedExpirations(firstOffset, 
LongStream.generate(() -> finalStep).limit(iterations), 1);
+}
+}
+}
+
+@Test
+public void testRandomlySpacedSyncs() {
+Random random = new Random(0L); // arbitrary but deterministic seed
+int iterationBits = 10;
+long iterations = 1 << iterationBits;
+for (int n = 1; n < Long.SIZE - iterationBits; n++) {
+// A stream with at most n bits of difference between the largest 
and smallest steps
+// will expire n + 2 syncs at once in the worst case, because the 
sync store is laid out exponentially.
+long maximumDifference = 1L << n;
+int maximumExpirations = n + 2;
+assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, 
0L, maximumDifference), maximumExpirations);
+// This holds true even if there is a larger minimum step size, 
such as caused by offsetLagMax
+long offsetLagMax = 1L << 16;
+assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, 
offsetLagMax, offsetLagMax + maximumDifference), maximumExpirations);
+}
+}
+
+@Test
+public void testDroppedSyncsSpacing() {
+Random random = new Random(0L); // arbitrary but deterministic seed
+long iterations = 1;
+long offsetLagMax = 100;
+// Half of the gaps will be offsetLagMax, and half will be double 
that, as if one intervening sync was dropped.
+LongStream stream = random.doubles()
+.mapToLong(d -> (d < 0.5 ? 2 : 1) * offsetLagMax)
+.limit(iterations);
+// This will cause up to 2 syncs to be discarded, because a sequence 
of two adjacent syncs followed by a
+// dropped sync will set up the following situation
+// before [dd,c,b,a]
+// after  [e..e,d,a]
+// and syncs b and c are discarded to make room for e and the demoted 
sync d.
+assertSyncSpacingHasBoundedExpi

[GitHub] [kafka] cmccabe commented on a diff in pull request #14169: KAFKA-15318: Update the Authorizer via AclPublisher

2023-08-09 Thread via GitHub


cmccabe commented on code in PR #14169:
URL: https://github.com/apache/kafka/pull/14169#discussion_r1288918541


##
core/src/main/scala/kafka/server/metadata/AclPublisher.scala:
##
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.utils.Logging
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
+import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.fault.FaultHandler
+
+import scala.concurrent.TimeoutException
+
+
+class AclPublisher(
+  id: Int,
+  faultHandler: FaultHandler,
+  nodeType: String,
+  authorizer: Option[Authorizer],
+) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
+  logIdent = s"[${name()}] "
+
+  override def name(): String = s"AclPublisher ${nodeType} id=${id}"
+
+  var completedInitialLoad = false
+
+  override def onMetadataUpdate(
+delta: MetadataDelta,
+newImage: MetadataImage,
+manifest: LoaderManifest
+  ): Unit = {
+val deltaName = s"MetadataDelta up to ${newImage.offset()}"
+
+// Apply changes to ACLs. This needs to be handled carefully because while 
we are
+// applying these changes, the Authorizer is continuing to return 
authorization
+// results in other threads. We never want to expose an invalid state. For 
example,
+// if the user created a DENY ALL acl and then created an ALLOW ACL for 
topic foo,
+// we want to apply those changes in that order, not the reverse order! 
Otherwise
+// there could be a window during which incorrect authorization results 
are returned.
+Option(delta.aclsDelta()).foreach { aclsDelta =>
+  authorizer match {
+case Some(authorizer: ClusterMetadataAuthorizer) => if 
(aclsDelta.isSnapshotDelta) {

Review Comment:
   yes, good point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15319:

Priority: Critical  (was: Major)

> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Priority: Critical
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 
> https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15319:

Component/s: streams

> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Priority: Major
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 
> https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.

2023-08-09 Thread via GitHub


gharris1727 commented on PR #14064:
URL: https://github.com/apache/kafka/pull/14064#issuecomment-1671793380

   Looking forward at the `sync-manifests` implementation, I realized that I 
had some duplication. The `ManifestEntry`, `findManifests` function, and the 
exclude behavior I added in this PR are all redundant and will be replaced with 
a different system in the next PR.
   
   The new implementation has a lot more machinery included with it that would 
be un-motivated in this PR, so I'm going to leave this current but obsolete 
implementation in place for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r128079


##
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+private static final Logger LOG = 
LoggerFactory.getLogger(LeaderElectionCommand.class);
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+public static void main(String... args) {
+try {
+run(Duration.ofMillis(3), args);
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+} catch (Exception e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+}
+}
+
+static void run(Duration timeoutMs, String... args) throws Exception {
+LeaderElectionCommandOptions commandOptions = new 
LeaderElectionCommandOptions(args);
+
+commandOptions.maybePrintHelpOrVersion();
+
+commandOptions.validate();
+ElectionType electionType = commandOptions.getElectionType();
+Optional> jsonFileTopicPartitions =
+Optional.ofNullable(commandOptions.getPathToJsonFile())
+.map(path -> parseReplicaElectionData(path));
+
+Optional topicOption = 
Optional.ofNullable(commandOptions.getTopic());
+Optional partitionOption = 
Optional.ofNullable(commandOptions.getPartition());
+final Optional> singleTopicPartition =
+(topicOption.isPresent() && partitionOption.isPresent()) ?
+Optional.of(Collections.singleton(new 
TopicPartition(topicOption.get(), partitionOption.get( :
+Optional.empty();
+
+/* Note: No need to look at --all-topic-partitions as we want this to 
be null if it is use.
+ * The validate function should be checking that this option is 
required if the --topic and --path-to-json-file
+ * are not specified.
+ */
+Optional> topicPartitions = 
jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+Properties props = new Properties();
+if (commandOptions.hasAdminClientConfig()) {
+
props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+}
+props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
commandOptions.getBootstrapServer(

[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r127858


##
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+private static final Logger LOG = 
LoggerFactory.getLogger(LeaderElectionCommand.class);
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+public static void main(String... args) {
+try {
+run(Duration.ofMillis(3), args);
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+} catch (Exception e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+}
+}
+
+static void run(Duration timeoutMs, String... args) throws Exception {
+LeaderElectionCommandOptions commandOptions = new 
LeaderElectionCommandOptions(args);
+
+commandOptions.maybePrintHelpOrVersion();
+
+commandOptions.validate();
+ElectionType electionType = commandOptions.getElectionType();
+Optional> jsonFileTopicPartitions =
+Optional.ofNullable(commandOptions.getPathToJsonFile())
+.map(path -> parseReplicaElectionData(path));
+
+Optional topicOption = 
Optional.ofNullable(commandOptions.getTopic());
+Optional partitionOption = 
Optional.ofNullable(commandOptions.getPartition());
+final Optional> singleTopicPartition =
+(topicOption.isPresent() && partitionOption.isPresent()) ?
+Optional.of(Collections.singleton(new 
TopicPartition(topicOption.get(), partitionOption.get( :
+Optional.empty();
+
+/* Note: No need to look at --all-topic-partitions as we want this to 
be null if it is use.
+ * The validate function should be checking that this option is 
required if the --topic and --path-to-json-file
+ * are not specified.
+ */
+Optional> topicPartitions = 
jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+Properties props = new Properties();
+if (commandOptions.hasAdminClientConfig()) {
+
props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+}
+props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
commandOptions.getBootstrapServer(

[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r127595


##
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+private static final Logger LOG = 
LoggerFactory.getLogger(LeaderElectionCommand.class);
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+public static void main(String... args) {
+try {
+run(Duration.ofMillis(3), args);
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+} catch (Exception e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+}
+}
+
+static void run(Duration timeoutMs, String... args) throws Exception {
+LeaderElectionCommandOptions commandOptions = new 
LeaderElectionCommandOptions(args);
+
+commandOptions.maybePrintHelpOrVersion();
+
+commandOptions.validate();
+ElectionType electionType = commandOptions.getElectionType();
+Optional> jsonFileTopicPartitions =
+Optional.ofNullable(commandOptions.getPathToJsonFile())
+.map(path -> parseReplicaElectionData(path));
+
+Optional topicOption = 
Optional.ofNullable(commandOptions.getTopic());
+Optional partitionOption = 
Optional.ofNullable(commandOptions.getPartition());
+final Optional> singleTopicPartition =
+(topicOption.isPresent() && partitionOption.isPresent()) ?
+Optional.of(Collections.singleton(new 
TopicPartition(topicOption.get(), partitionOption.get( :
+Optional.empty();
+
+/* Note: No need to look at --all-topic-partitions as we want this to 
be null if it is use.
+ * The validate function should be checking that this option is 
required if the --topic and --path-to-json-file
+ * are not specified.
+ */
+Optional> topicPartitions = 
jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+Properties props = new Properties();
+if (commandOptions.hasAdminClientConfig()) {
+
props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+}
+props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
commandOptions.getBootstrapServer(

[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r126617


##
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+private static final Logger LOG = 
LoggerFactory.getLogger(LeaderElectionCommand.class);
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+public static void main(String... args) {
+try {
+run(Duration.ofMillis(3), args);
+} catch (TerseException e) {

Review Comment:
   Removed the `TerseException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r126278


##
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandErrorTest.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * For some error cases, we can save a little build time by avoiding the 
overhead for
+ * cluster creation and cleanup because the command is expected to fail 
immediately.
+ */
+public class LeaderElectionCommandErrorTest {
+@Test
+public void testTopicWithoutPartition() {
+String out = ToolsTestUtils.captureStandardErr(() -> 
LeaderElectionCommand.main(
+new String[] {

Review Comment:
   Remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1288877172


##
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+private static final Logger LOG = 
LoggerFactory.getLogger(LeaderElectionCommand.class);
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+public static void main(String... args) {
+try {
+run(Duration.ofMillis(3), args);
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+} catch (Exception e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+}
+}
+
+static void run(Duration timeoutMs, String... args) throws Exception {
+LeaderElectionCommandOptions commandOptions = new 
LeaderElectionCommandOptions(args);
+
+commandOptions.maybePrintHelpOrVersion();
+
+commandOptions.validate();
+ElectionType electionType = commandOptions.getElectionType();
+Optional> jsonFileTopicPartitions =
+Optional.ofNullable(commandOptions.getPathToJsonFile())
+.map(path -> parseReplicaElectionData(path));
+
+Optional topicOption = 
Optional.ofNullable(commandOptions.getTopic());
+Optional partitionOption = 
Optional.ofNullable(commandOptions.getPartition());
+final Optional> singleTopicPartition =
+(topicOption.isPresent() && partitionOption.isPresent()) ?
+Optional.of(Collections.singleton(new 
TopicPartition(topicOption.get(), partitionOption.get( :
+Optional.empty();
+
+/* Note: No need to look at --all-topic-partitions as we want this to 
be null if it is use.
+ * The validate function should be checking that this option is 
required if the --topic and --path-to-json-file
+ * are not specified.
+ */
+Optional> topicPartitions = 
jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+Properties props = new Properties();
+if (commandOptions.hasAdminClientConfig()) {
+
props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+}
+props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
commandOptions.getBootstrapServer(

[jira] [Created] (KAFKA-15326) Decouple Processing Thread from Polling Thread

2023-08-09 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-15326:
--

 Summary: Decouple Processing Thread from Polling Thread
 Key: KAFKA-15326
 URL: https://issues.apache.org/jira/browse/KAFKA-15326
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Lucas Brutschy
Assignee: Lucas Brutschy


As part of an ongoing effort to implement a better threading architecture in 
Kafka streams, we decouple N stream threads into N polling threads and N 
processing threads. The effort to consolidate N polling thread into a single 
thread is follow-up after this ticket. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #14169: KAFKA-15318: Update the Authorizer via AclPublisher

2023-08-09 Thread via GitHub


cmccabe commented on code in PR #14169:
URL: https://github.com/apache/kafka/pull/14169#discussion_r1288849218


##
metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java:
##
@@ -256,11 +224,6 @@ public void replay(
 throw new RuntimeException("Unable to replay " + record + " for " 
+ acl +
 ": acl not found " + "in existingAcls.");
 }
-if (!snapshotId.isPresent()) {

Review Comment:
   Yeah, it was always a messy corner case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14169: KAFKA-15318: Update the Authorizer via AclPublisher

2023-08-09 Thread via GitHub


cmccabe commented on code in PR #14169:
URL: https://github.com/apache/kafka/pull/14169#discussion_r1288848313


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1112,35 +1102,12 @@ private void appendRaftEvent(String name, Runnable 
runnable) {
 if (this != metaLogListener) {
 log.debug("Ignoring {} raft event from an old 
registration", name);
 } else {
-try {
-runnable.run();
-} finally {
-maybeCompleteAuthorizerInitialLoad();
-}
+runnable.run();
 }
 });
 }
 }
 
-private void maybeCompleteAuthorizerInitialLoad() {

Review Comment:
   Yes. I added a comment to the call to `completeInitialLoad`
   
   ```
   if (!completedInitialLoad) {
 // If we are receiving this onMetadataUpdate call, that means the 
MetadataLoader has
 // loaded up to the local high water mark. So we complete the 
initial load, enabling
 // the authorizer.
 completedInitialLoad = true
 authorizer.completeInitialLoad()
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #14082: KAFKA-15102: Mirror Maker 2 - KIP690 backward compatibility

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #14082:
URL: https://github.com/apache/kafka/pull/14082#discussion_r1288843685


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -406,7 +406,6 @@ public void testReplication() throws Exception {
 "New topic was not replicated to primary cluster.");

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #14082: KAFKA-15102: Mirror Maker 2 - KIP690 backward compatibility

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #14082:
URL: https://github.com/apache/kafka/pull/14082#discussion_r1288843344


##
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java:
##
@@ -43,6 +47,11 @@ public void configure(Map props) {
 log.info("Using custom remote topic separator: '{}'", separator);
 separatorPattern = Pattern.compile(Pattern.quote(separator));
 }
+
+if (props.containsKey(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG)) {
+log.info("Disable the usage of topic separator for internal 
topics");
+isInternalTopicSeparatorEnabled = 
Boolean.valueOf(props.get(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG).toString());
+}

Review Comment:
   Good shout! moved it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #14082: KAFKA-15102: Mirror Maker 2 - KIP690 backward compatibility

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #14082:
URL: https://github.com/apache/kafka/pull/14082#discussion_r1288843014


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DefaultReplicationIntegrationTest.java:
##
@@ -0,0 +1,324 @@
+/*

Review Comment:
   I deleted the test. The change is small enough we can skip the integration 
test. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15298) Disable DeleteRecords on Tiered Storage topics

2023-08-09 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov resolved KAFKA-15298.
---
Resolution: Won't Fix

> Disable DeleteRecords on Tiered Storage topics
> --
>
> Key: KAFKA-15298
> URL: https://issues.apache.org/jira/browse/KAFKA-15298
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
>
> Currently the DeleteRecords API does not work with Tiered Storage. We should 
> ensure that this is reflected in the responses that clients get when trying 
> to use the API with tiered topics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-09 Thread Jinyong Choi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752451#comment-17752451
 ] 

Jinyong Choi commented on KAFKA-15302:
--

[~mjsax] , [~guozhang] 
 
If you perform a delete() within a while() loop, it seems that due to the 
interactions of maybeEvict()->flush(), the value of a key that hasn't been 
traversed yet might return as stale data. Therefore, I consider this to be a 
bug.
 
The main changes are as follows: I've made modifications to give a hint to the 
cache, determining whether to call maybeEvict(), by checking the current state 
of the RocksDB KeyValueStore, particularly when it's in a snapshot state.
 
Please refer to the code snippet below for a complete view of the changes.(I 
haven't modified the test code.)

 
[https://github.com/apache/kafka/compare/trunk...jinyongchoi:kafka:KAFKA-15302-testing]
{code:java}
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
...
    @Override
    public boolean isEvictionInvocationViable() {
        return openIterators.isEmpty();
    }

streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
...
private void putInternal(final Bytes key, final byte[] value) {
        context.cache().put(
            cacheName,
            key,
            new LRUCacheEntry(
                value,
                context.headers(),
                true,
                context.offset(),
                context.timestamp(),
                context.partition(),
                context.topic()),
            wrapped().isEvictionInvocationViable());


        StoreQueryUtils.updatePosition(position, context);
    }

streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
...
public void put(final String namespace, final Bytes key, final 
LRUCacheEntry value, final boolean isEvictionViable) {
numPuts++;

final NamedCache cache = getOrCreateCache(namespace);

synchronized (cache) {
final long oldSize = cache.sizeInBytes();
cache.put(key, value);
sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
if (isEvictionViable) {
maybeEvict(namespace, cache);
}
}
}
{code}
 
After the modifications, the following thoughts arise:
 
1. It seems unnecessary to perform delete operations during traversal for 
SessionStore or TimestampedKeyValueStore, but this aspect needs documentation.
 
2. It functions as expected, but the code doesn't seem to be very clean.
 
3. Since flush() is suppressed during the while() loop, many keys are stored in 
the Cache. However, as their values are null, it appears there isn't a 
significant memory burden. Still, caution is warranted.
 
4. Due to the inhibition of flush() during the while() loop, a subsequent flush 
operation, as shown below, took 10 seconds.
While cases requiring the processing of 5,000,000 items at once are unlikely, 
this aspect also demands attention.

 
{code:java}
21:26:17.509 [...-StreamThread-1] INFO  o.a.k.s.state.internals.NamedCache -- 
Named Cache flush start dirtyKeys.size():500 entries:500
21:26:26.874 [...-StreamThread-1] INFO  o.a.k.s.state.internals.NamedCache -- 
Named Cache flush end dirtyKeys.size():0 entries:500{code}
5. If it takes time to correct it in the right direction, it might be a good 
idea to document this in advance to aid developers' understanding.

 

I'm not coming up with any better ideas.
If it takes time to make the correct modifications, I agree that we should 
update the documentation first.

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any error

[GitHub] [kafka] kamalcph commented on a diff in pull request #14114: KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs

2023-08-09 Thread via GitHub


kamalcph commented on code in PR #14114:
URL: https://github.com/apache/kafka/pull/14114#discussion_r1288729452


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -102,49 +102,14 @@ public String topicWarningMessage(String topicName) {
 
 public static class RemoteLogConfig {

Review Comment:
   Opened #14176 to address this comment. PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph opened a new pull request, #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-09 Thread via GitHub


kamalcph opened a new pull request, #14176:
URL: https://github.com/apache/kafka/pull/14176

   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15100) Unsafe to call tryCompleteFetchResponse on request timeout

2023-08-09 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio resolved KAFKA-15100.

Resolution: Fixed

> Unsafe to call tryCompleteFetchResponse on request timeout
> --
>
> Key: KAFKA-15100
> URL: https://issues.apache.org/jira/browse/KAFKA-15100
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0, 3.4.2, 3.5.2
>
>
> When the fetch request times out the future is completed from the 
> "raft-expiration-executor" SystemTimer thread. KafkaRaftClient assumes that 
> tryCompleteFetchResponse is always called from the same thread. This 
> invariant is violated in this case.
> {code:java}
>            return future.handle((completionTimeMs, exception) -> {
>               if (exception != null) {
>                   Throwable cause = exception instanceof ExecutionException ?
>                       exception.getCause() : exception;                  // 
> If the fetch timed out in purgatory, it means no new data is available,
>                   // and we will complete the fetch successfully. Otherwise, 
> if there was
>                   // any other error, we need to return it.
>                   Errors error = Errors.forException(cause);
>                   if (error != Errors.REQUEST_TIMED_OUT) {
>                       logger.info("Failed to handle fetch from {} at {} due 
> to {}",
>                           replicaId, fetchPartition.fetchOffset(), error);
>                       return buildEmptyFetchResponse(error, Optional.empty());
>                   }
>               }              // FIXME: `completionTimeMs`, which can be null
>               logger.trace("Completing delayed fetch from {} starting at 
> offset {} at {}",
>                   replicaId, fetchPartition.fetchOffset(), completionTimeMs);
>               return tryCompleteFetchRequest(replicaId, fetchPartition, 
> time.milliseconds());
>           });
> {code}
> One solution is to always build an empty response if the future was completed 
> exceptionally. This works because the ExpirationService completes the future 
> with a `TimeoutException`.
> A longer-term solution is to use a more flexible event executor service. This 
> would be a service that allows more kinds of event to get scheduled/submitted 
> to the KRaft thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15100) Unsafe to call tryCompleteFetchResponse on request timeout

2023-08-09 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-15100:
---
Fix Version/s: 3.4.2

> Unsafe to call tryCompleteFetchResponse on request timeout
> --
>
> Key: KAFKA-15100
> URL: https://issues.apache.org/jira/browse/KAFKA-15100
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0, 3.4.2, 3.5.2
>
>
> When the fetch request times out the future is completed from the 
> "raft-expiration-executor" SystemTimer thread. KafkaRaftClient assumes that 
> tryCompleteFetchResponse is always called from the same thread. This 
> invariant is violated in this case.
> {code:java}
>            return future.handle((completionTimeMs, exception) -> {
>               if (exception != null) {
>                   Throwable cause = exception instanceof ExecutionException ?
>                       exception.getCause() : exception;                  // 
> If the fetch timed out in purgatory, it means no new data is available,
>                   // and we will complete the fetch successfully. Otherwise, 
> if there was
>                   // any other error, we need to return it.
>                   Errors error = Errors.forException(cause);
>                   if (error != Errors.REQUEST_TIMED_OUT) {
>                       logger.info("Failed to handle fetch from {} at {} due 
> to {}",
>                           replicaId, fetchPartition.fetchOffset(), error);
>                       return buildEmptyFetchResponse(error, Optional.empty());
>                   }
>               }              // FIXME: `completionTimeMs`, which can be null
>               logger.trace("Completing delayed fetch from {} starting at 
> offset {} at {}",
>                   replicaId, fetchPartition.fetchOffset(), completionTimeMs);
>               return tryCompleteFetchRequest(replicaId, fetchPartition, 
> time.milliseconds());
>           });
> {code}
> One solution is to always build an empty response if the future was completed 
> exceptionally. This works because the ExpirationService completes the future 
> with a `TimeoutException`.
> A longer-term solution is to use a more flexible event executor service. This 
> would be a service that allows more kinds of event to get scheduled/submitted 
> to the KRaft thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeqo commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-09 Thread via GitHub


jeqo commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1288620620


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();

Review Comment:
   Similar here. We can initialize this and uninitializedAt in constructor, and 
pass SystemTime on test setup.



##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;

Review Comment:
   Is this really needed? 
   I can see value is overwritten on test setup to 10L, but I can manage to 
test successfully with default value 100L.
   



##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile 

[GitHub] [kafka] vveicc commented on pull request #14175: KAFKA-15288: Change BrokerApiVersionsCommandTest to support kraft mode

2023-08-09 Thread via GitHub


vveicc commented on PR #14175:
URL: https://github.com/apache/kafka/pull/14175#issuecomment-1671596827

   Thank you for your advice @dengziming, this is very helpful to me. I have 
made some changes, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes

2023-08-09 Thread via GitHub


clolov commented on code in PR #14136:
URL: https://github.com/apache/kafka/pull/14136#discussion_r1288687969


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -347,15 +353,16 @@ public void onLeadershipChange(Set 
partitionsBecomeLeader,
 public void stopPartitions(TopicPartition topicPartition, boolean delete) {
 if (delete) {
 // Delete from internal datastructures only if it is to be deleted.
-Uuid topicIdPartition = topicPartitionIds.remove(topicPartition);
-LOGGER.debug("Removed partition: {} from topicPartitionIds", 
topicIdPartition);
+Map mapping = metaDataCache.topicNamesToIds();
+Uuid topicIdPartition = mapping.remove(topicPartition.topic());

Review Comment:
   I don't think there is anything to be done here. The topic partitions should 
disappear once the controller lets us know they should.



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -280,13 +285,18 @@ public RemoteStorageManager storageManager() {
 return remoteLogStorageManager;
 }
 
+public MetadataCache metadataCache() {
+return metaDataCache;
+}
+
 private Stream filterPartitions(Set partitions) {
 // We are not specifically checking for internal topics etc here as 
`log.remoteLogEnabled()` already handles that.
 return partitions.stream().filter(partition -> 
partition.log().exists(UnifiedLog::remoteLogEnabled));
 }
 
 private void cacheTopicPartitionIds(TopicIdPartition topicIdPartition) {
-Uuid previousTopicId = 
topicPartitionIds.put(topicIdPartition.topicPartition(), 
topicIdPartition.topicId());
+Map mapping = metaDataCache.topicNamesToIds();

Review Comment:
   Adding to Divij's comment - this is an unmodifiable map. Even if you wanted 
to you wouldn't be able to add to it:
   ```
 def topicNamesToIds(): util.Map[String, Uuid] = {
   Collections.unmodifiableMap(metadataSnapshot.topicIds.asJava)
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15100) Unsafe to call tryCompleteFetchResponse on request timeout

2023-08-09 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-15100:
---
Fix Version/s: 3.5.2

> Unsafe to call tryCompleteFetchResponse on request timeout
> --
>
> Key: KAFKA-15100
> URL: https://issues.apache.org/jira/browse/KAFKA-15100
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0, 3.5.2
>
>
> When the fetch request times out the future is completed from the 
> "raft-expiration-executor" SystemTimer thread. KafkaRaftClient assumes that 
> tryCompleteFetchResponse is always called from the same thread. This 
> invariant is violated in this case.
> {code:java}
>            return future.handle((completionTimeMs, exception) -> {
>               if (exception != null) {
>                   Throwable cause = exception instanceof ExecutionException ?
>                       exception.getCause() : exception;                  // 
> If the fetch timed out in purgatory, it means no new data is available,
>                   // and we will complete the fetch successfully. Otherwise, 
> if there was
>                   // any other error, we need to return it.
>                   Errors error = Errors.forException(cause);
>                   if (error != Errors.REQUEST_TIMED_OUT) {
>                       logger.info("Failed to handle fetch from {} at {} due 
> to {}",
>                           replicaId, fetchPartition.fetchOffset(), error);
>                       return buildEmptyFetchResponse(error, Optional.empty());
>                   }
>               }              // FIXME: `completionTimeMs`, which can be null
>               logger.trace("Completing delayed fetch from {} starting at 
> offset {} at {}",
>                   replicaId, fetchPartition.fetchOffset(), completionTimeMs);
>               return tryCompleteFetchRequest(replicaId, fetchPartition, 
> time.milliseconds());
>           });
> {code}
> One solution is to always build an empty response if the future was completed 
> exceptionally. This works because the ExpirationService completes the future 
> with a `TimeoutException`.
> A longer-term solution is to use a more flexible event executor service. This 
> would be a service that allows more kinds of event to get scheduled/submitted 
> to the KRaft thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-09 Thread via GitHub


AndrewJSchofield commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1288667079


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -119,20 +130,24 @@ public synchronized Cluster fetch() {
  * @return remaining time in ms till the cluster info can be updated again
  */
 public synchronized long timeToAllowUpdate(long nowMs) {
-return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+return Math.max(this.lastRefreshMs + 
this.refreshBackoff.backoff(this.attempts) - nowMs, 0);
 }
 
 /**
  * The next time to update the cluster info is the maximum of the time the 
current info will expire and the time the
- * current info can be updated (i.e. backoff time has elapsed); If an 
update has been request then the expiry time
- * is now
+ * current info can be updated (i.e. backoff time has elapsed). There are 
two calculations for backing off based on
+ * how many requests with backing off have been issued, and how many 
attempts to retrieve metadata have been made
+ * since the last successful response. The first of these allows backing 
off when there are errors to do with
+ * stale metadata, even though the metadata responses are clean.
  *
  * @param nowMs current time in ms
  * @return remaining time in ms till updating the cluster info
  */
 public synchronized long timeToNextUpdate(long nowMs) {
-long timeToExpire = updateRequested() ? 0 : 
Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
-return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
+long timeToUpdateWithBackoff = Math.max(this.lastRefreshMs +
+(this.backoffUpdateRequests > 0 ? 
this.refreshBackoff.backoff(this.backoffUpdateRequests - 1) : 0) - nowMs, 0);
+long timeToExpire = Math.max(this.lastSuccessfulRefreshMs + 
this.metadataExpireMs - nowMs, 0);
+return Math.max(updateRequested() ? timeToUpdateWithBackoff : 
timeToExpire, timeToAllowUpdate(nowMs));

Review Comment:
   Yes, I agree. I've spun it round to simplify it and added comments. Will 
push the commit soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-09 Thread via GitHub


clolov commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1288608136


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java:
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+
+import kafka.utils.EmptyTestInfo;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
+private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class);
+
+private static final int SEG_SIZE = 1024 * 1024;
+
+private final Time time = new MockTime(1);
+private final TopicBasedRemoteLogMetadataManagerHarness 
remoteLogMetadataManagerHarness = new 
TopicBasedRemoteLogMetadataManagerHarness();
+
+private TopicBasedRemoteLogMetadataManager rlmm() {
+return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+}
+
+@BeforeEach
+public void setup() {
+// Start the cluster only.
+remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo());
+}
+
+@AfterEach
+public void teardown() throws IOException {
+remoteLogMetadataManagerHarness.close();
+}
+
+@Test
+public void testMultiplePartitionSubscriptions() throws Exception {
+// Create topics.
+String leaderTopic = "leader";
+HashMap> assignedLeaderTopicReplicas = new 
HashMap<>();
+List leaderTopicReplicas = new ArrayList<>();
+// Set broker id 0 as the first entry which is taken as the leader.
+leaderTopicReplicas.add(0);
+leaderTopicReplicas.add(1);
+leaderTopicReplicas.add(2);
+assignedLeaderTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(leaderTopicReplicas));
+remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic,
+JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
+remoteLogMetadataManagerHarness.listenerName());
+
+String followerTopic = "follower";
+HashMap> assignedFollowerTopicReplicas = new 
HashMap<>();
+List followerTopicReplicas = new ArrayList<>();
+// Set broker id 1 as the first entry which is taken as the leader.
+followerTopicReplicas.add(1);
+followerTopicReplicas.add(2);
+followerTopicReplicas.add(0);
+assignedFollowerTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(followerTopicReplicas));
+remoteLogMetadataManagerHarness.createTopicWithAssignment(
+followerTopic, 
JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
+remoteLogMetadataManagerHarness.listenerName());
+
+String topicWithNoMessages = "no-messages-topic";
+HashMap> assignedTopicReplicas = new HashMap<>();
+List noMessagesTopicReplicas = new ArrayList<>();
+// Set broker id 1 as the first entry which is taken as the leader.
+noMessagesTopicReplicas.add(1);
+noMes

[GitHub] [kafka] jeffkbkim commented on pull request #14124: Kafka-14509; [1/2] Define ConsumerGroupDescribe API request and response schemas and classes.

2023-08-09 Thread via GitHub


jeffkbkim commented on PR #14124:
URL: https://github.com/apache/kafka/pull/14124#issuecomment-1671432782

   Thanks @riedelmax!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #14141: KAFKA-15100; KRaft data race with the expiration service

2023-08-09 Thread via GitHub


jsancio merged PR #14141:
URL: https://github.com/apache/kafka/pull/14141


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] prestona commented on pull request #14173: KAFKA-15042: Improve TAGGED_FIELDS protocol documentation

2023-08-09 Thread via GitHub


prestona commented on PR #14173:
URL: https://github.com/apache/kafka/pull/14173#issuecomment-1671409792

   For context, any API message description that has a TAG_BUFFER field will be 
changed as follows
   https://github.com/apache/kafka/assets/10883734/108fd5a9-cf6c-4698-8311-ea6845bf7886";>
   
   "TAGGED_FIELDS" will have a row in the [Protocol Types 
table](https://kafka.apache.org/protocol.html#protocol_types), where the type 
will be described as:
   > Represents a sequence of tagged fields. First the length N + 1 is given as 
an UNSIGNED_VARINT. Then N tag field instances follow. A tag field is a triplet 
of a tag, a length, and data. The tag is an UNSIGNED_VARINT. The length F + 1 
is given as an UNSIGNED_VARINT. Null data is represented as a length of 0, 
otherwise F bytes of data follow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14168: MINOR; Fix nanosecond elapsed time

2023-08-09 Thread via GitHub


divijvaidya commented on PR #14168:
URL: https://github.com/apache/kafka/pull/14168#issuecomment-1671384884

   Wow! This is news to me. BTW there is a JIRA tracking this test failure at 
https://issues.apache.org/jira/browse/KAFKA-15052 and you might want to 
associate this PR with that JIRA.
   
   We uses System.nanoTime() for comparison all over the place such as at 
https://github.com/apache/kafka/blob/f23394336a7741bf4eb23fcde951af0a23a69bd0/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala#L573,
 
https://github.com/apache/kafka/blob/f23394336a7741bf4eb23fcde951af0a23a69bd0/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java#L231
 and many many other places. Do you think those places may be having the same 
overflow problem? If yes, can I request you to please create a JIRA for it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #14169: KAFKA-15318: Update the Authorizer via AclPublisher

2023-08-09 Thread via GitHub


mumrah commented on code in PR #14169:
URL: https://github.com/apache/kafka/pull/14169#discussion_r1288505525


##
metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java:
##
@@ -256,11 +224,6 @@ public void replay(
 throw new RuntimeException("Unable to replay " + record + " for " 
+ acl +
 ": acl not found " + "in existingAcls.");
 }
-if (!snapshotId.isPresent()) {

Review Comment:
   Nice to see this code going away. It always tripped me up 😅 



##
core/src/main/scala/kafka/server/metadata/AclPublisher.scala:
##
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.utils.Logging
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
+import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.fault.FaultHandler
+
+import scala.concurrent.TimeoutException
+
+
+class AclPublisher(
+  id: Int,
+  faultHandler: FaultHandler,
+  nodeType: String,
+  authorizer: Option[Authorizer],
+) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
+  logIdent = s"[${name()}] "
+
+  override def name(): String = s"AclPublisher ${nodeType} id=${id}"
+
+  var completedInitialLoad = false
+
+  override def onMetadataUpdate(
+delta: MetadataDelta,
+newImage: MetadataImage,
+manifest: LoaderManifest
+  ): Unit = {
+val deltaName = s"MetadataDelta up to ${newImage.offset()}"
+
+// Apply changes to ACLs. This needs to be handled carefully because while 
we are
+// applying these changes, the Authorizer is continuing to return 
authorization
+// results in other threads. We never want to expose an invalid state. For 
example,
+// if the user created a DENY ALL acl and then created an ALLOW ACL for 
topic foo,
+// we want to apply those changes in that order, not the reverse order! 
Otherwise
+// there could be a window during which incorrect authorization results 
are returned.
+Option(delta.aclsDelta()).foreach { aclsDelta =>
+  authorizer match {
+case Some(authorizer: ClusterMetadataAuthorizer) => if 
(aclsDelta.isSnapshotDelta) {

Review Comment:
   Since we have the manifest here, we can remove this `isSnapshotDelta` from 
AclsDelta



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1112,35 +1102,12 @@ private void appendRaftEvent(String name, Runnable 
runnable) {
 if (this != metaLogListener) {
 log.debug("Ignoring {} raft event from an old 
registration", name);
 } else {
-try {
-runnable.run();
-} finally {
-maybeCompleteAuthorizerInitialLoad();
-}
+runnable.run();
 }
 });
 }
 }
 
-private void maybeCompleteAuthorizerInitialLoad() {

Review Comment:
   The equivalent logic to this is handled by MetadataLoader, right? (not 
publishing until we've reached the HWM)



##
core/src/main/scala/kafka/server/metadata/AclPublisher.scala:
##
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kaf

[GitHub] [kafka] dengziming commented on a diff in pull request #14175: KAFKA-15288: Change BrokerApiVersionsCommandTest to support kraft mode

2023-08-09 Thread via GitHub


dengziming commented on code in PR #14175:
URL: https://github.com/apache/kafka/pull/14175#discussion_r1288503732


##
core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala:
##
@@ -56,21 +66,30 @@ class BrokerApiVersionsCommandTest extends 
KafkaServerTestHarness {
 assertTrue(lineIter.hasNext)
 assertEquals(s"${bootstrapServers()} (id: 0 rack: null) -> (", 
lineIter.next())
 val nodeApiVersions = NodeApiVersions.create

Review Comment:
   We should change how we construct `NodeApiVersions` here,  such as `val 
nodeApiVersions = new 
NodeApiVersions(clientApis.map(ApiVersionsResponse.toApiVersion).asJava, 
Collections.emptyList(), false)`



##
core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala:
##
@@ -17,37 +17,47 @@
 
 package kafka.admin
 
-import java.io.{ByteArrayOutputStream, PrintStream}
-import java.nio.charset.StandardCharsets
-import scala.collection.Seq
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.NodeApiVersions
+import org.apache.kafka.common.message.ApiMessageType
 import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotNull, assertTrue}
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
 
   def generateConfigs: Seq[KafkaConfig] =
-TestUtils.createBrokerConfigs(1, zkConnect).map(props => {
-  // Configure control plane listener to make sure we have separate 
listeners from client,
-  // in order to avoid returning Envelope API version.
-  props.setProperty(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLLER")
-  props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-  props.setProperty("listeners", 
"PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
-  props.setProperty(KafkaConfig.AdvertisedListenersProp, 
"PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
-  props.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
-  props
-}).map(KafkaConfig.fromProps)
+if (isKRaftTest()) {
+  TestUtils.createBrokerConfigs(1, null).map(props => {
+props.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")

Review Comment:
   Please add a comment about this change, I guess this is related to KIP-848, 
and we can remove this after KIP-848 is all set.



##
core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala:
##
@@ -56,21 +66,30 @@ class BrokerApiVersionsCommandTest extends 
KafkaServerTestHarness {
 assertTrue(lineIter.hasNext)
 assertEquals(s"${bootstrapServers()} (id: 0 rack: null) -> (", 
lineIter.next())
 val nodeApiVersions = NodeApiVersions.create
-val enabledApis = ApiKeys.zkBrokerApis.asScala
-for (apiKey <- enabledApis) {
-  val apiVersion = nodeApiVersions.apiVersion(apiKey)
-  assertNotNull(apiVersion)
+val listenerType = if (isKRaftTest()) {
+  ApiMessageType.ListenerType.BROKER
+} else {
+  ApiMessageType.ListenerType.ZK_BROKER
+}
+val clientApis = ApiKeys.clientApis().asScala
+for (apiKey <- clientApis) {
+  assertTrue(lineIter.hasNext)
+  val actual = lineIter.next()
+  if (apiKey.inScope(listenerType)) {
+val apiVersion = nodeApiVersions.apiVersion(apiKey)
+assertNotNull(apiVersion)
 
-  val versionRangeStr =
-if (apiVersion.minVersion == apiVersion.maxVersion) 
apiVersion.minVersion.toString
-else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
-  val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)
+val versionRangeStr =
+  if (apiVersion.minVersion == apiVersion.maxVersion) 
apiVersion.minVersion.toString
+  else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
+val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)
 
-  val terminator = if (apiKey == enabledApis.last) "" else ","
+val terminator = if (apiKey == clientApis.last) "" else ","
 
-  val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: 
$usableVersion]$terminator"
-  assertTrue(lineIter.hasNext)
-  assertEquals(line, lineIter.next())
+val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: 
$usableVersion]$terminator"
+assertTrue(lineIter.hasNext)
+assertEquals(line, actual)
+  }

Review Comment:
   We should add a else branch 

[jira] [Created] (KAFKA-15325) Integrate topicId in OffsetFetch and OffsetCommit async consumer calls

2023-08-09 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15325:
--

 Summary: Integrate topicId in OffsetFetch and OffsetCommit async 
consumer calls
 Key: KAFKA-15325
 URL: https://issues.apache.org/jira/browse/KAFKA-15325
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


KIP-848 introduces support for topicIds in the OffsetFetch and OffsetCommit 
APIs. The consumer calls to those APIs should be updated to include topicIds 
when available.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-09 Thread via GitHub


kamalcph commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1288449342


##
core/src/main/scala/kafka/server/ConfigHandler.scala:
##
@@ -62,6 +62,12 @@ class TopicConfigHandler(private val logManager: LogManager, 
kafkaConfig: KafkaC
 topicConfig.asScala.forKeyValue { (key, value) =>
   if (!configNamesToExclude.contains(key)) props.put(key, value)
 }
+
+if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()

Review Comment:
   lgtm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15295) Add config validation when remote storage is enabled on a topic

2023-08-09 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-15295:


Assignee: Kamal Chandraprakash  (was: Luke Chen)

> Add config validation when remote storage is enabled on a topic
> ---
>
> Key: KAFKA-15295
> URL: https://issues.apache.org/jira/browse/KAFKA-15295
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.6.0
>
>
> If system level remote storage is not enabled, then enabling remote storage 
> on a topic should throw exception while validating the configs. 
> See https://github.com/apache/kafka/pull/14114#discussion_r1280372441 for 
> more details



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] nizhikov commented on a diff in pull request #14172: KAFKA-14595 Java version of ReassignPartitionsCommand POJOs

2023-08-09 Thread via GitHub


nizhikov commented on code in PR #14172:
URL: https://github.com/apache/kafka/pull/14172#discussion_r1288443925


##
tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+import java.util.Objects;
+
+/**
+ * A replica log directory move state where the move is in progress.
+ */
+final class ActiveMoveState implements LogDirMoveState {
+public final String currentLogDir;
+
+public final String targetLogDir;
+
+public final String futureLogDir;
+
+/**
+ * @param currentLogDir   The current log directory.
+ * @param futureLogDirThe log directory that the replica is moving 
to.
+ * @param targetLogDirThe log directory that we wanted the replica 
to move to.
+ */
+public ActiveMoveState(String currentLogDir, String targetLogDir, String 
futureLogDir) {
+this.currentLogDir = currentLogDir;
+this.targetLogDir = targetLogDir;
+this.futureLogDir = futureLogDir;
+}
+
+@Override
+public boolean done() {
+return false;
+}
+
+@Override
+public boolean equals(Object o) {

Review Comment:
   Yes. It used in tests during comparsion when POJOs are keys or values of 
collection returned by tested code.
   
   Examples:
   ```
   class ReassignPartitionsUnitTest {
   ...
 @Test
 def testFindPartitionReassignmentStates(): Unit = {
   ...
 assertEquals((Map(
 new TopicPartition("foo", 0) -> 
PartitionReassignmentState(Seq(0,1,2), Seq(0,1,3), false),
 new TopicPartition("foo", 1) -> 
PartitionReassignmentState(Seq(1,2,3), Seq(1,2,3), true)
   ), true),
   findPartitionReassignmentStates(adminClient, Seq(
 (new TopicPartition("foo", 0), Seq(0,1,3)),
 (new TopicPartition("foo", 1), Seq(1,2,3))
   )))
   ...
 @Test
 def testFindLogDirMoveStates(): Unit = {
   ...
 assertEquals(Map(
   new TopicPartitionReplica("bar", 0, 0) -> new 
CompletedMoveState("/tmp/kafka-logs0"),
   new TopicPartitionReplica("foo", 0, 0) -> new 
ActiveMoveState("/tmp/kafka-logs0",
   "/tmp/kafka-logs1", "/tmp/kafka-logs1"),
   new TopicPartitionReplica("foo", 1, 0) -> new 
CancelledMoveState("/tmp/kafka-logs0",
 "/tmp/kafka-logs1"),
   new TopicPartitionReplica("quux", 1, 0) -> new 
MissingLogDirMoveState("/tmp/kafka-logs1"),
   new TopicPartitionReplica("quuz", 0, 0) -> new 
MissingReplicaMoveState("/tmp/kafka-logs0")
 ), findLogDirMoveStates(adminClient, Map(
   new TopicPartitionReplica("bar", 0, 0) -> "/tmp/kafka-logs0",
   new TopicPartitionReplica("foo", 0, 0) -> "/tmp/kafka-logs1",
   new TopicPartitionReplica("foo", 1, 0) -> "/tmp/kafka-logs1",
   new TopicPartitionReplica("quux", 1, 0) -> "/tmp/kafka-logs1",
   new TopicPartitionReplica("quuz", 0, 0) -> "/tmp/kafka-logs0"
 )))
   ...
   
   ```



##
tools/src/main/java/org/apache/kafka/tools/reassign/Tuple.java:
##
@@ -0,0 +1,48 @@
+/*

Review Comment:
   Removed



##
tools/src/main/java/org/apache/kafka/tools/reassign/TerseReassignmentFailureException.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+import org

[jira] [Commented] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2023-08-09 Thread Florin Akermann (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752407#comment-17752407
 ] 

Florin Akermann commented on KAFKA-12317:
-

Drafted a PR: [https://github.com/apache/kafka/pull/14174]

> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`-key (`null`-join-key for 
> stream-globalTable), because for a `null`-(join)key the join is undefined: 
> ie, we don't have an attribute the do the table lookup (we consider the 
> stream-record as malformed). Note, that we define the semantics of 
> _left/outer_ join as: keep the stream record if no matching join record was 
> found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyway). We need to relax this behavior for a left stream-table and 
> left/outer stream-stream join. User need to be aware (ie, we might need to 
> put this into the docs and JavaDocs), that records with `null`-key would be 
> partitioned randomly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vveicc opened a new pull request, #14175: KAFKA-15288: Change BrokerApiVersionsCommandTest to support kraft mode

2023-08-09 Thread via GitHub


vveicc opened a new pull request, #14175:
URL: https://github.com/apache/kafka/pull/14175

   Change BrokerApiVersionsCommandTest to support kraft mode.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #14102: KAFKA-13187: Replace EasyMock / PowerMock with Mockito in DistributedHerderTest

2023-08-09 Thread via GitHub


yashmayya commented on code in PR #14102:
URL: https://github.com/apache/kafka/pull/14102#discussion_r1288239604


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -3724,62 +2818,54 @@ public void 
testExternalZombieFencingRequestDelayedCompletion() throws Exception
 pendingFencing,
 tasksPerConnector
 );
-tasksPerConnector.keySet().forEach(c -> 
expectConfigRefreshAndSnapshot(configState));
+expectConfigRefreshAndSnapshot(configState);
+
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+doNothing().when(member).poll(anyLong());
 
 // The callbacks that the herder has accrued for outstanding fencing 
futures, which will be completed after
 // a successful round of fencing and a task record write to the config 
topic
-Map>> 
herderFencingCallbacks = new HashMap<>();
+Map>> 
herderFencingCallbacks = new HashMap<>();
 // The callbacks that the herder has installed for after a successful 
round of zombie fencing, but before writing
 // a task record to the config topic
-Map>> 
workerFencingFollowups = new HashMap<>();
+Map>> 
workerFencingFollowups = new HashMap<>();
 
 Map callbacksInstalled = new HashMap<>();
 tasksPerConnector.forEach((connector, numStackedRequests) -> {
 // The future returned by Worker::fenceZombies
-KafkaFuture workerFencingFuture = 
EasyMock.mock(KafkaFuture.class);
-// The future tracked by the herder (which tracks the fencing 
performed by the worker and the possible followup write to the config topic) 
-KafkaFuture herderFencingFuture = 
EasyMock.mock(KafkaFuture.class);
+KafkaFuture workerFencingFuture = mock(KafkaFuture.class);
+// The future tracked by the herder (which tracks the fencing 
performed by the worker and the possible followup write to the config topic)
+KafkaFuture herderFencingFuture = mock(KafkaFuture.class);
 
-Capture> 
herderFencingCallback = EasyMock.newCapture(CaptureType.ALL);
+ArgumentCaptor> 
herderFencingCallback = ArgumentCaptor.forClass(KafkaFuture.BiConsumer.class);
 herderFencingCallbacks.put(connector, herderFencingCallback);
 
 // Don't immediately invoke callbacks that the herder sets up for 
when the worker fencing and writes to the config topic have completed
 // Instead, wait for them to be installed, then invoke them 
explicitly after the fact on a thread separate from the herder's tick thread
-
EasyMock.expect(herderFencingFuture.whenComplete(EasyMock.capture(herderFencingCallback)))
-.andReturn(null)
-.times(numStackedRequests + 1);
+
when(herderFencingFuture.whenComplete(herderFencingCallback.capture())).thenReturn(null);
 
-Capture> fencingFollowup = 
EasyMock.newCapture();
+ArgumentCaptor> 
fencingFollowup = ArgumentCaptor.forClass(KafkaFuture.BaseFunction.class);
 CountDownLatch callbackInstalled = new CountDownLatch(1);
 workerFencingFollowups.put(connector, fencingFollowup);
 callbacksInstalled.put(connector, callbackInstalled);
-
EasyMock.expect(workerFencingFuture.thenApply(EasyMock.capture(fencingFollowup))).andAnswer(()
 -> {
+
when(workerFencingFuture.thenApply(fencingFollowup.capture())).thenAnswer(invocation
 -> {
 callbackInstalled.countDown();
 return herderFencingFuture;
 });
 
 // We should only perform a single physical zombie fencing; all 
the subsequent requests should be stacked onto the first one
-EasyMock.expect(worker.fenceZombies(
-EasyMock.eq(connector), 
EasyMock.eq(taskCountRecords.get(connector)), EasyMock.anyObject())
-).andReturn(workerFencingFuture);
-
-for (int i = 0; i < numStackedRequests; i++) {
-expectConfigRefreshAndSnapshot(configState);
-}
-
-PowerMock.replay(workerFencingFuture, herderFencingFuture);
+when(worker.fenceZombies(eq(connector), 
eq(taskCountRecords.get(connector)), any()))
+.thenReturn(workerFencingFuture)
+.thenAnswer(invocation -> {
+fail("Expected only a single zombie fencing per 
connector");

Review Comment:
   Oh, that's a good point, it would not be bubbled up correctly in fact.
   
   Edit: I guess it'd actually depend on where the extra calls are being made 
from but either way the explicit verification is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr

  1   2   >