[GitHub] [kafka] vamossagar12 commented on pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime

2023-04-24 Thread via GitHub


vamossagar12 commented on PR #13594:
URL: https://github.com/apache/kafka/pull/13594#issuecomment-1521193360

   @yashmayya , hmm MemoryOffsetBackingStore is used in Connect Standalone 
IIUC. I am not totally aware of the historical context here of whether throwing 
a ConnectException is valid or not. Do you reckon changing that behaviour is 
safe?
   Regarding the shutdown mechanism in Worker, turns out it is what JavaDocs 
provides as an example 
https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ExecutorService.html
 and was pointing out in a code review 
[here](https://github.com/apache/kafka/pull/11955#discussion_r859425060). Since 
it's in Worker, I am slightly sceptical in changing this behaviour and also the 
fact that JavaDocs suggests 2 phase shutdown example. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13631: KAFKA-14909: check zkMigrationReady tag before migration

2023-04-24 Thread via GitHub


showuon commented on code in PR #13631:
URL: https://github.com/apache/kafka/pull/13631#discussion_r1176030390


##
clients/src/main/java/org/apache/kafka/clients/ApiVersions.java:
##
@@ -64,4 +64,8 @@ public synchronized byte maxUsableProduceMagic() {
 return maxUsableProduceMagic;
 }
 
+// check if all nodes are ZK Migration ready
+public boolean isAllNodeZkMigrationReady() {

Review Comment:
   Agree! Moving to `QuorumFeatures` now, and add tests to test the 
non-controller node case. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13631: KAFKA-14909: check zkMigrationReady tag before migration

2023-04-24 Thread via GitHub


showuon commented on code in PR #13631:
URL: https://github.com/apache/kafka/pull/13631#discussion_r1176030100


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -134,8 +139,7 @@ private void initializeMigrationState() {
 }
 
 private boolean isControllerQuorumReadyForMigration() {
-// TODO implement this
-return true;
+return this.apiVersions.isAllNodeZkMigrationReady();

Review Comment:
   Good suggestion. Updated. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13631: KAFKA-14909: check zkMigrationReady tag before migration

2023-04-24 Thread via GitHub


showuon commented on code in PR #13631:
URL: https://github.com/apache/kafka/pull/13631#discussion_r1176029069


##
core/src/main/scala/kafka/server/ControllerServer.scala:
##
@@ -270,7 +271,8 @@ class ControllerServer(
 "zk migration",
 fatal = false,
 () => {}
-  )
+  ),
+  quorumFeatures

Review Comment:
   feed `quorumFeatures` into `KRaftMigrationDriver` now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime

2023-04-24 Thread via GitHub


vamossagar12 commented on code in PR #13594:
URL: https://github.com/apache/kafka/pull/13594#discussion_r1176029043


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java:
##
@@ -216,7 +216,7 @@ public void putTopicStateRetriableFailure() {
 }).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), 
any(Callback.class));
 
 store.put(topicStatus);
-verify(kafkaBasedLog, times(2)).send(any(), any(), any());
+verify(kafkaBasedLog, timeout(300).times(2)).send(any(), any(), any());

Review Comment:
   thanks @viktorsomogyi . I created a separate PR so that the flaky test can 
be fixed before the code freeze(27th April). Here is the PR: 
https://github.com/apache/kafka/pull/13634



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 opened a new pull request, #13634: KAFKA-14929: Fixing flaky test putTopicStateRetriableFailure

2023-04-24 Thread via GitHub


vamossagar12 opened a new pull request, #13634:
URL: https://github.com/apache/kafka/pull/13634

   Fixing flaky test putTopicStateRetriableFailure by adding a timeout to have 
the background thread kicked in which invokes KafkaBasedLog#send.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13631: KAFKA-14909: check zkMigrationReady tag before migration

2023-04-24 Thread via GitHub


showuon commented on code in PR #13631:
URL: https://github.com/apache/kafka/pull/13631#discussion_r1176027142


##
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java:
##
@@ -277,64 +312,65 @@ CompletableFuture 
enqueueMetadataChangeEventWithFuture(
 public void testOnlySendNeededRPCsToBrokers() throws Exception {
 CountingMetadataPropagator metadataPropagator = new 
CountingMetadataPropagator();
 CapturingMigrationClient migrationClient = new 
CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3)));
-KRaftMigrationDriver driver = new KRaftMigrationDriver(
+try (KRaftMigrationDriver driver = new KRaftMigrationDriver(

Review Comment:
   Using try with resource. Nothing else changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13631: KAFKA-14909: check zkMigrationReady tag before migration

2023-04-24 Thread via GitHub


showuon commented on code in PR #13631:
URL: https://github.com/apache/kafka/pull/13631#discussion_r1176025802


##
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java:
##
@@ -383,8 +466,8 @@ public ZkMigrationLeadershipState 
claimControllerLeadership(ZkMigrationLeadershi
 driver.onMetadataUpdate(delta, image, new 
LogDeltaManifest(provenance,
 new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));
 Assertions.assertTrue(claimLeaderAttempts.await(1, 
TimeUnit.MINUTES));
-TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.ZK_MIGRATION),
-"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION 
state");
+TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
+"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");

Review Comment:
   This test failed sometimes because `ZK_MIGRATION` is just a middle state, so 
we might miss to get this state and cause the test failure. Changing to verify 
the `DUAL_WRITE` state which will be more reliable. This will also verify the 
`ZK_MIGRATION` state because we can't jump to `DUAL_WRITE` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest

2023-04-24 Thread via GitHub


hgeraldino commented on PR #13383:
URL: https://github.com/apache/kafka/pull/13383#issuecomment-1521090615

   Thanks for your feedback @C0urante. 
   
   I finally had time this past week and tweaked the test to address your 
comments and fix coverage. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest

2023-04-24 Thread via GitHub


hgeraldino commented on code in PR #13383:
URL: https://github.com/apache/kafka/pull/13383#discussion_r1175960799


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##
@@ -474,129 +442,107 @@ public void testFailureInPollAfterStop() throws 
Exception {
 taskFuture.get();
 assertPollMetrics(0);
 
-PowerMock.verifyAll();
+verifyCleanStartup();
+verify(statusListener).onShutdown(taskId);
+verify(sourceTask).stop();
+verifyOffsetFlush(true);
+verifyClose();
 }
 
 @Test
 public void testPollReturnsNoRecords() throws Exception {
 // Test that the task handles an empty list of records
 createWorkerTask();
 
-expectCleanStartup();
-
 // We'll wait for some data, then trigger a flush
-final CountDownLatch pollLatch = expectEmptyPolls(1, new 
AtomicInteger());
-expectEmptyOffsetFlush();
-
-sourceTask.stop();
-EasyMock.expectLastCall();
-expectEmptyOffsetFlush();
-
-statusListener.onShutdown(taskId);
-EasyMock.expectLastCall();
-
-expectClose();
-
-PowerMock.replayAll();
+final CountDownLatch pollLatch = expectEmptyPolls(new AtomicInteger());
+expectOffsetFlush(true, false);
 
 workerTask.initialize(TASK_CONFIG);
 Future taskFuture = executor.submit(workerTask);
 
 assertTrue(awaitLatch(pollLatch));
 assertTrue(workerTask.commitOffsets());
+verify(offsetWriter).beginFlush(anyLong(), any(TimeUnit.class));
+reset(offsetWriter);
+
 workerTask.stop();
 assertTrue(workerTask.awaitStop(1000));
+verify(offsetWriter).beginFlush(anyLong(), any(TimeUnit.class));
+verifyNoMoreInteractions(offsetWriter);
 
 taskFuture.get();
 assertPollMetrics(0);
 
-PowerMock.verifyAll();
+verifyCleanStartup();
+verify(sourceTask).stop();
+verify(statusListener).onShutdown(taskId);
+verifyClose();
 }
 
 @Test
 public void testCommit() throws Exception {
 // Test that the task commits properly when prompted
 createWorkerTask();
 
-expectCleanStartup();
-
 // We'll wait for some data, then trigger a flush
 final CountDownLatch pollLatch = expectPolls(1);
-expectOffsetFlush(true);
-
-offsetWriter.offset(PARTITION, OFFSET);

Review Comment:
   Not anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest

2023-04-24 Thread via GitHub


hgeraldino commented on code in PR #13383:
URL: https://github.com/apache/kafka/pull/13383#discussion_r1175960401


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##
@@ -1003,31 +868,35 @@ private boolean awaitLatch(CountDownLatch latch) {
 return false;
 }
 
-@SuppressWarnings("unchecked")
 private void expectOffsetFlush(boolean succeed) throws Exception {
-EasyMock.expect(offsetWriter.beginFlush(EasyMock.anyLong(), 
EasyMock.anyObject())).andReturn(true);
-Future flushFuture = PowerMock.createMock(Future.class);
-
EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture);
+expectOffsetFlush(succeed, true);
+}
+
+@SuppressWarnings("unchecked")
+private void expectOffsetFlush(boolean succeed, boolean offsetFlush, 
Boolean... remainingVals) throws Exception {
+when(offsetWriter.beginFlush(anyLong(), 
any(TimeUnit.class))).thenReturn(offsetFlush, remainingVals);
+
+if (offsetFlush) {

Review Comment:
   This was definitely wrong. I re-implemented this logic splitting the 
`beginFlush` and `doFlush` calls into separate util methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] atu-sharm opened a new pull request, #13633: KAFKA-14839: Exclude protected variable from JavaDocs

2023-04-24 Thread via GitHub


atu-sharm opened a new pull request, #13633:
URL: https://github.com/apache/kafka/pull/13633

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
   Adding rules to display only Public API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] maulin-vasavada commented on pull request #13626: MINOR: only set sslEngine#setUseClientMode to false once when ssl mode is server

2023-04-24 Thread via GitHub


maulin-vasavada commented on PR #13626:
URL: https://github.com/apache/kafka/pull/13626#issuecomment-1520995629

   @machi1990 looks good to me. However you will need a committer to review and 
merge this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #13533: KAFKA-12446: update change encoding to use varint

2023-04-24 Thread via GitHub


mjsax commented on PR #13533:
URL: https://github.com/apache/kafka/pull/13533#issuecomment-1520912631

   Merged to `trunk` and cherry-picked to `3.5` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13533: KAFKA-12446: update change encoding to use varint

2023-04-24 Thread via GitHub


mjsax merged PR #13533:
URL: https://github.com/apache/kafka/pull/13533


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio closed pull request #13030: MINOR; Add log message when Log high watermark differs

2023-04-24 Thread via GitHub


jsancio closed pull request #13030: MINOR; Add log message when Log high 
watermark differs
URL: https://github.com/apache/kafka/pull/13030


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13030: MINOR; Add log message when Log high watermark differs

2023-04-24 Thread via GitHub


jsancio commented on PR #13030:
URL: https://github.com/apache/kafka/pull/13030#issuecomment-1520845699

   Changes included in https://github.com/apache/kafka/pull/13553


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13538: KAFKA-14462; [8/N] Add ConsumerGroupMember

2023-04-24 Thread via GitHub


jeffkbkim commented on code in PR #13538:
URL: https://github.com/apache/kafka/pull/13538#discussion_r1175790304


##
checkstyle/suppressions.xml:
##
@@ -318,6 +318,14 @@
 
 
+
+
+

[jira] [Updated] (KAFKA-14932) Heuristic for increasing the log start offset after replicas are caught up

2023-04-24 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14932:
---
Description: 
The implementation in [https://github.com/apache/kafka/pull/9816] increases the 
log start offset as soon as a snapshot is created that is greater than the log 
start offset. This is correct but causes some inefficiency in some cases.
 # Any follower, voters or observers, with an end offset between the leader's 
log start offset and the leader's latest snapshot will get invalidated. This 
will cause those follower to fetch the new snapshot and reload it's state 
machine.
 # Any {{Listener}} or state machine that has a {{nextExpectedOffset()}} less 
than the latest snapshot will get invalidated. This will cause the state 
machine to have to reload its state from the latest snapshot.

To minimize the frequency of these reloads KIP-630 proposes adding the 
following configuration:
 * {{metadata.start.offset.lag.time.max.ms}} - The maximum amount of time that 
leader will wait for an offset to get replicated to all of the live replicas 
before advancing the {{{}LogStartOffset{}}}. See section “When to Increase the 
LogStartOffset”. The default is 7 days.

This description and implementation should be extended to also apply to the 
state machine, or {{{}Listener{}}}. The local log start offset should be 
increased when all of the {{{}ListenerContext{}}}'s {{nextExpectedOffset()}} is 
greater than the offset of the latest snapshot.

I should point that this logic is slightly different when the replica is a 
leader vs of a follower.
 # Leader should only advance the log start offset if:
 ## All of followers fetched past a snapshot
 ## All of the Listener have read past a snapshot
 ## Or there is a timeout
 # While followers should only advance the log start offset to the leader log 
start offset:
 ## If there is a local snapshot greater that the leader's log start offset
 ## All of the Listener have read past the leader's log start offset

Another requirement is that the log start offset must always be zero or equal 
to the end offset of another snapshot. This is needed so that the raft client 
can know the epoch of the offset prior to the log start offset. In practice 
this mean that the topic partition log will have 2 snapshots when log start 
offset is greater than 0.

This can be implemented by changing {{ReplicatedLog::startOffset}} to:
{code:java}
OffsetAndEpoch startOffsetAndEpoch(); {code}

  was:
The implementation in [https://github.com/apache/kafka/pull/9816] increases the 
log start offset as soon as a snapshot is created that is greater than the log 
start offset. This is correct but causes some inefficiency in some cases.
 # Any follower, voters or observers, with an end offset between the leader's 
log start offset and the leader's latest snapshot will get invalidated. This 
will cause those follower to fetch the new snapshot and reload it's state 
machine.
 # Any {{Listener}} or state machine that has a {{nextExpectedOffset()}} less 
than the latest snapshot will get invalidated. This will cause the state 
machine to have to reload its state from the latest snapshot.

To minimize the frequency of these reloads KIP-630 proposes adding the 
following configuration:
 * {{metadata.start.offset.lag.time.max.ms}} - The maximum amount of time that 
leader will wait for an offset to get replicated to all of the live replicas 
before advancing the {{{}LogStartOffset{}}}. See section “When to Increase the 
LogStartOffset”. The default is 7 days.

This description and implementation should be extended to also apply to the 
state machine, or {{{}Listener{}}}. The local log start offset should be 
increased when all of the {{{}ListenerContext{}}}'s {{nextExpectedOffset()}} is 
greater than the offset of the latest snapshot.


> Heuristic for increasing the log start offset after replicas are caught up
> --
>
> Key: KAFKA-14932
> URL: https://issues.apache.org/jira/browse/KAFKA-14932
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
>
> The implementation in [https://github.com/apache/kafka/pull/9816] increases 
> the log start offset as soon as a snapshot is created that is greater than 
> the log start offset. This is correct but causes some inefficiency in some 
> cases.
>  # Any follower, voters or observers, with an end offset between the leader's 
> log start offset and the leader's latest snapshot will get invalidated. This 
> will cause those follower to fetch the new snapshot and reload it's state 
> machine.
>  # Any {{Listener}} or state machine that has a {{nextExpectedOffset()}} less 
> than the latest snapshot will 

[jira] [Created] (KAFKA-14932) Heuristic for increasing the log start offset after replicas are caught up

2023-04-24 Thread Jira
José Armando García Sancio created KAFKA-14932:
--

 Summary: Heuristic for increasing the log start offset after 
replicas are caught up
 Key: KAFKA-14932
 URL: https://issues.apache.org/jira/browse/KAFKA-14932
 Project: Kafka
  Issue Type: Sub-task
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio


The implementation in [https://github.com/apache/kafka/pull/9816] increases the 
log start offset as soon as a snapshot is created that is greater than the log 
start offset. This is correct but causes some inefficiency in some cases.
 # Any follower, voters or observers, with an end offset between the leader's 
log start offset and the leader's latest snapshot will get invalidated. This 
will cause those follower to fetch the new snapshot and reload it's state 
machine.
 # Any {{Listener}} or state machine that has a {{nextExpectedOffset()}} less 
than the latest snapshot will get invalidated. This will cause the state 
machine to have to reload its state from the latest snapshot.

To minimize the frequency of these reloads KIP-630 proposes adding the 
following configuration:
 * {{metadata.start.offset.lag.time.max.ms}} - The maximum amount of time that 
leader will wait for an offset to get replicated to all of the live replicas 
before advancing the {{{}LogStartOffset{}}}. See section “When to Increase the 
LogStartOffset”. The default is 7 days.

This description and implementation should be extended to also apply to the 
state machine, or {{{}Listener{}}}. The local log start offset should be 
increased when all of the {{{}ListenerContext{}}}'s {{nextExpectedOffset()}} is 
greater than the offset of the latest snapshot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on pull request #13615: KAFKA-14834: [12/N] Minor code cleanups relating to versioned stores

2023-04-24 Thread via GitHub


mjsax commented on PR #13615:
URL: https://github.com/apache/kafka/pull/13615#issuecomment-1520833481

   Merged to `trunk` and cherry-picked to `3.5`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13615: KAFKA-14834: [12/N] Minor code cleanups relating to versioned stores

2023-04-24 Thread via GitHub


mjsax merged PR #13615:
URL: https://github.com/apache/kafka/pull/13615


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-24 Thread via GitHub


mjsax commented on PR #13592:
URL: https://github.com/apache/kafka/pull/13592#issuecomment-1520757094

   Merged to `trunk` and cherry-picked to `3.5` and `3.4` branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13621: KAFKA-14133: Migrate ChangeLogReader mock in TaskManagerTest to Mockito

2023-04-24 Thread via GitHub


cadonna commented on code in PR #13621:
URL: https://github.com/apache/kafka/pull/13621#discussion_r1175663484


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2602,23 +2608,21 @@ public void shouldUpdateInputPartitionsAfterRebalance() 
{
 assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
 assertThat(task00.state(), is(Task.State.RUNNING));
 assertEquals(newPartitionsSet, task00.inputPartitions());
-verify(activeTaskCreator, consumer, changeLogReader);
+verify(activeTaskCreator, consumer);
 }
 
 @Test
 public void shouldAddNewActiveTasks() {
 final Map> assignment = taskId00Assignment;
 final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, 
true);
 
-expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
 expect(consumer.assignment()).andReturn(emptySet());
 consumer.resume(eq(emptySet()));
 expectLastCall();
-changeLogReader.enforceRestoreActive();
 expectLastCall();

Review Comment:
   This line can also be removed.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2653,15 +2658,13 @@ public void initializeIfNeeded() {
 
 consumer.commitSync(Collections.emptyMap());
 expectLastCall();
-expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
 expect(consumer.assignment()).andReturn(emptySet());
 consumer.resume(eq(emptySet()));
 expectLastCall();
-changeLogReader.enforceRestoreActive();
 expectLastCall();

Review Comment:
   This line can also be removed.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -3045,17 +3048,14 @@ public void closeDirty() {
 }
 };
 
-resetToStrict(changeLogReader);
-changeLogReader.enforceRestoreActive();
-expect(changeLogReader.completedChangelogs()).andReturn(emptySet());

Review Comment:
   Since the mock was reset to strict, I think you need to verify or explicitly 
stub the call to `changeLogReader.completedChangelogs()`. Otherwise, the test 
would not fail, if `completedChangelogs()` were not called.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -3168,16 +3167,13 @@ public Set changelogPartitions() {
 }
 };
 
-resetToStrict(changeLogReader);
-changeLogReader.enforceRestoreActive();
-expect(changeLogReader.completedChangelogs()).andReturn(emptySet());

Review Comment:
   Same here



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -3117,16 +3118,13 @@ public Set changelogPartitions() {
 final Map offsets = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
 task00.setCommittableOffsetsAndMetadata(offsets);
 
-resetToStrict(changeLogReader);
-changeLogReader.enforceRestoreActive();
-expect(changeLogReader.completedChangelogs()).andReturn(emptySet());

Review Comment:
   Same here.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -3297,16 +3294,13 @@ public void suspend() {
 }
 };
 
-resetToStrict(changeLogReader);
-changeLogReader.enforceRestoreActive();
-expect(changeLogReader.completedChangelogs()).andReturn(emptySet());

Review Comment:
   Same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on pull request #13600: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-24 Thread via GitHub


jeffkbkim commented on PR #13600:
URL: https://github.com/apache/kafka/pull/13600#issuecomment-1520740394

   i see a different error now
   ```
   
   [2023-04-24T17:51:21.714Z] FAILURE: Build failed with an exception.
   
   [2023-04-24T17:51:21.714Z] 
   
   [2023-04-24T17:51:21.714Z] * What went wrong:
   
   [2023-04-24T17:51:21.714Z] Execution failed for task 
':server-common:spotbugsMain'.
   
   [2023-04-24T17:51:21.714Z] > A failure occurred while executing 
com.github.spotbugs.snom.internal.SpotBugsRunnerForWorker$SpotBugsExecutor
   
   [2023-04-24T17:51:21.714Z]> Failed to run Gradle Worker Daemon
   
   [2023-04-24T17:51:21.714Z]   > Process 'Gradle Worker Daemon 4' finished 
with non-zero exit value 1
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on pull request #13602: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-24 Thread via GitHub


jeffkbkim commented on PR #13602:
URL: https://github.com/apache/kafka/pull/13602#issuecomment-1520737975

   the build looks to be successful


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-24 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1175718263


##
metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java:
##
@@ -115,6 +115,9 @@ private void publishDelta(MetadataDelta delta) {
 }
 }
 changes.apply(metrics);
+if (delta.featuresDelta() != null) {

Review Comment:
   That's fine although it could also be a future improvement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-24 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1175717268


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1123,29 +1165,104 @@ private void claim(int epoch) {
 // Prepend the activate event. It is important that this event go 
at the beginning
 // of the queue rather than the end (hence prepend rather than 
append). It's also
 // important not to use prepend for anything else, to preserve the 
ordering here.
-queue.prepend(new ControllerWriteEvent<>("completeActivation[" + 
epoch + "]",
-new CompleteActivationEvent()));
+ControllerWriteEvent activationEvent = new 
ControllerWriteEvent<>("completeActivation[" + epoch + "]",
+new CompleteActivationEvent(),
+EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, 
RUNS_IN_PREMIGRATION));
+activationEvent.future.whenComplete((__, t) -> {
+if (t != null) {
+fatalFaultHandler.handleFault("exception while activating 
controller", t);

Review Comment:
   Yes, I recall now. We always hit the fault handler if `replay` fails, but 
other issues just trigger a resignation and failover. Since this is a bit 
special it makes sense to specifically invoke the fault handler.
   
   Should we use `exceptionally` here rather than `whenComplete`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13533: KAFKA-12446: update change encoding to use varint

2023-04-24 Thread via GitHub


mjsax commented on code in PR #13533:
URL: https://github.com/apache/kafka/pull/13533#discussion_r1175716345


##
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##
@@ -543,6 +543,16 @@ public void 
shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMul
 
 @Test
 public void shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() 
throws Exception {
+/*
+Gradle Test Run :streams:unitTest > Gradle Test Executor 19 > 
NamedTopologyIntegrationTest > 
shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() FAILED
+java.lang.AssertionError:
+Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]>
+ but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]>
+at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
+at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
+at 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing(NamedTopologyIntegrationTest.java:563)
+
+ */

Review Comment:
   Ups. This should not go into this PR. Good catch.
   
   The test is flaky and it was not to myself for my local copy only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14862) Outer stream-stream join does not output all results with multiple input partitions

2023-04-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14862:

Affects Version/s: 3.1.0

> Outer stream-stream join does not output all results with multiple input 
> partitions
> ---
>
> Key: KAFKA-14862
> URL: https://issues.apache.org/jira/browse/KAFKA-14862
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Bruno Cadonna
>Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
> If I execute the following Streams app once with two input topics each with 1 
> partition and then with input topics each with two partitions, I get 
> different results.
>   
> {code:java}
> final KStream leftSide = builder.stream(leftSideTopic);
> final KStream rightSide = builder.stream(rightSideTopic);
> final KStream leftAndRight = leftSide.outerJoin(
> rightSide,
> (leftValue, rightValue) ->
> (rightValue == null) ? leftValue + "/NOTPRESENT": leftValue + "/" + 
> rightValue,
> JoinWindows.ofTimeDifferenceAndGrace(
> Duration.ofSeconds(20), 
> Duration.ofSeconds(10)),
> StreamJoined.with(
> Serdes.String(), /* key */
> Serdes.String(), /* left value */
> Serdes.String()  /* right value */
> ));
> leftAndRight.print(Printed.toSysOut());
> {code}
> To reproduce, produce twice the following batch of records with an interval 
> greater than window + grace period (i.e. > 30 seconds) in between the two 
> batches:
> {code}
> (0, 0)
> (1, 1)
> (2, 2)
> (3, 3)
> (4, 4)
> (5, 5)
> (6, 6)
> (7, 7)
> (8, 8)
> (9, 9)
> {code}
> With input topics with 1 partition I get:
> {code}
> [KSTREAM-PROCESSVALUES-08]: 0, 0/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 2, 2/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 5, 5/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 6, 6/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
> {code}
> With input topics with 2 partitions I get:
> {code}
> [KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
> [KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
> {code}
> I would expect to get the same set of records, maybe in a different order due 
> to the partitioning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-24 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1175710520


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -869,19 +906,30 @@ public CompletableFuture 
acceptBatch(List recordBatch)
 return future;
 }
 ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-new MigrationWriteOperation(recordBatch));
+new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
 queue.append(batchEvent);
 return batchEvent.future;
 }
 
 @Override
-public OffsetAndEpoch completeMigration() {
-// TODO write migration record, use KIP-868 transaction
-return highestMigrationRecordOffset;
+public CompletableFuture completeMigration() {
+log.info("Completing ZK Migration");
+// TODO use KIP-868 transaction
+ControllerWriteEvent event = new 
ControllerWriteEvent<>("Complete ZK Migration",
+new MigrationWriteOperation(
+Collections.singletonList(
+new ApiMessageAndVersion(
+new 
ZkMigrationStateRecord().setZkMigrationState(ZkMigrationState.MIGRATION.value()),
+ZkMigrationStateRecord.LOWEST_SUPPORTED_VERSION)
+)),
+EnumSet.of(RUNS_IN_PREMIGRATION));
+queue.append(event);
+return event.future.thenApply(__ -> highestMigrationRecordOffset);
 }
 
 @Override
 public void abortMigration() {
+log.error("Aborting ZK Migration");

Review Comment:
   If you really want to leave this for future work (i.e. not this PR) we can. 
Although we'll need to do something here for 3.5 I think... 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-24 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1175709187


##
core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala:
##
@@ -145,25 +146,29 @@ class BrokerRegistrationRequestTest {
   }
 
   @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, 
metadataVersion = MetadataVersion.IBP_3_3_IV3,
-serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "true")))
-  def testRegisterZkWithKRaftOldMetadataVersion(clusterInstance: 
ClusterInstance): Unit = {
+serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "false")))

Review Comment:
   > I added a test in QuorumControllerTest
   
   ok
   
   > we don't expose the fault handler outside of TestKit (which maybe we 
should)
   
   we don't have to do it now, but perhaps in the future we should add an 
accessor for the fault handler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-24 Thread via GitHub


mjsax merged PR #13592:
URL: https://github.com/apache/kafka/pull/13592


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-24 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1175709187


##
core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala:
##
@@ -145,25 +146,29 @@ class BrokerRegistrationRequestTest {
   }
 
   @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, 
metadataVersion = MetadataVersion.IBP_3_3_IV3,
-serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "true")))
-  def testRegisterZkWithKRaftOldMetadataVersion(clusterInstance: 
ClusterInstance): Unit = {
+serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "false")))

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-24 Thread via GitHub


cmccabe commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1175709047


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -324,4 +329,29 @@ class ZkMigrationIntegrationTest {
   assertTrue(firstProducerIdBlock.firstProducerId() < 
producerIdBlock.firstProducerId())
 }
   }
+
+  /**
+   * Start a KRaft cluster with migrations enabled, verify that the controller 
does not accept metadata changes
+   * through the RPCs
+   */
+  @ClusterTests(Array(
+new ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, 
metadataVersion = MetadataVersion.IBP_3_4_IV0,
+  serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "true"))),
+new ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, 
metadataVersion = MetadataVersion.IBP_3_5_IV0,
+  serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "true"))),
+new ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, 
metadataVersion = MetadataVersion.IBP_3_5_IV1,
+  serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "true")))
+  ))
+  def testPreMigrationMode(clusterInstance: ClusterInstance): Unit = {

Review Comment:
   Thanks.
   
   ```
   Reason: The active controller appears to be node 3000
   ```
   
   can you change it so that this talks about premigration?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics

2023-04-24 Thread via GitHub


mjsax commented on code in PR #13622:
URL: https://github.com/apache/kafka/pull/13622#discussion_r1175694673


##
docs/streams/core-concepts.html:
##
@@ -328,13 +328,17 @@ <
 for stateful operations such as aggregations and joins, however, 
out-of-order data could cause the processing logic to be incorrect. If users 
want to handle such out-of-order data, generally they need to allow their 
applications
 to wait for longer time while bookkeeping their states during the wait 
time, i.e. making trade-off decisions between latency, cost, and correctness.
 In Kafka Streams specifically, users can configure their window 
operators for windowed aggregations to achieve such trade-offs (details can be 
found in Developer 
Guide).
-As for Joins, users have to be aware that some of the out-of-order 
data cannot be handled by increasing on latency and cost in Streams yet:
+As for Joins, users may use versioned
 state stores to address concerns with out-of-order data, but out-of-order 
data will not be handled by default:
 
 
 
- For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulted stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins. 
- For Stream-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order), and hence it may produce unpredictable 
results. 
- For Table-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order). However, the join result is a changelog 
stream and hence will be eventually consistent. 
+ For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulting stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins.

Review Comment:
   > but the resulting stream may contain unnecessary leftRecord-null for left 
joins, and leftRecord-null or null-rightRecord for outer joins.
   
   Not true any longer. We have the spurious-null-fix -- Would you mind doing a 
follow up PR for 3.4 so we can fix the docs there (and older versions -- need 
to dig out the KIP which versions need this update).



##
docs/streams/core-concepts.html:
##
@@ -328,13 +328,17 @@ <
 for stateful operations such as aggregations and joins, however, 
out-of-order data could cause the processing logic to be incorrect. If users 
want to handle such out-of-order data, generally they need to allow their 
applications
 to wait for longer time while bookkeeping their states during the wait 
time, i.e. making trade-off decisions between latency, cost, and correctness.
 In Kafka Streams specifically, users can configure their window 
operators for windowed aggregations to achieve such trade-offs (details can be 
found in Developer 
Guide).
-As for Joins, users have to be aware that some of the out-of-order 
data cannot be handled by increasing on latency and cost in Streams yet:
+As for Joins, users may use versioned
 state stores to address concerns with out-of-order data, but out-of-order 
data will not be handled by default:
 
 
 
- For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulted stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins. 
- For Stream-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order), and hence it may produce unpredictable 
results. 
- For Table-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order). However, the join result is a changelog 
stream and hence will be eventually consistent. 
+ For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulting stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins.
+This behavior is the same regardless of whether versioned stores 
are used.

Review Comment:
   > This behavior is the same regardless of whether versioned stores are used.
   
   That's confusing, because versioned-stores only apply to KTable, but for 
KStream-KStream join there is no KTable. The used stores are totally internal 
(no even exposed via IQ IIRC).



##
docs/streams/developer-guide/dsl-api.html:
##
@@ -3609,6 +3631,52 @@ KTable-KTable 
Foreign-Key
  

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-24 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1175703194


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-24 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1175701652


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if 

[GitHub] [kafka] mumrah commented on a diff in pull request #13407: KAFKA-14805 KRaft controller supports pre-migration mode

2023-04-24 Thread via GitHub


mumrah commented on code in PR #13407:
URL: https://github.com/apache/kafka/pull/13407#discussion_r1175698954


##
core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala:
##
@@ -145,25 +146,29 @@ class BrokerRegistrationRequestTest {
   }
 
   @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, 
metadataVersion = MetadataVersion.IBP_3_3_IV3,
-serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "true")))
-  def testRegisterZkWithKRaftOldMetadataVersion(clusterInstance: 
ClusterInstance): Unit = {
+serverProperties = Array(new ClusterConfigProperty(key = 
"zookeeper.metadata.migration.enable", value = "false")))

Review Comment:
   I added a test in QuorumControllerTest -- it's a little hard to integration 
test since we don't expose the fault handler outside of TestKit (which maybe we 
should)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-24 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1175689958


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if 

[jira] [Commented] (KAFKA-14927) Dynamic configs not validated when using kafka-configs and --add-config-file

2023-04-24 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715940#comment-17715940
 ] 

Colin McCabe commented on KAFKA-14927:
--

I haven't verified this but I would expect the same behavior on KRaft since the 
bug here (if we want to call it that) is a mismatch between the validation in 
ConfigCommand and the lack of validation on the broker.

A good fix would probably be only allowing characters in 
'([a-z][A-Z][0-9][._-])*' to be config keys. We can always revisit if people 
want more options for config keys (so far I'm not aware of anyone who would 
want this).

This is maybe a bit of a grey area, you could argue that it needs a KIP 
although in some sense it's really formalizing what we've been doing all 
along...

> Dynamic configs not validated when using kafka-configs and --add-config-file
> 
>
> Key: KAFKA-14927
> URL: https://issues.apache.org/jira/browse/KAFKA-14927
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.3.2
>Reporter: Justin Daines
>Assignee: José Armando García Sancio
>Priority: Minor
>
> Using {{kafka-configs}} should validate dynamic configurations before 
> applying. It is possible to send a file with invalid configurations. 
> For example a file containing the following:
> {code:java}
> {
>   "routes": {
>     "crn:///kafka=*": {
>       "management": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied"
>       },
>       "describe": {
>         "allowed": "",
>         "denied": "confluent-audit-log-events-denied"
>       },
>       "authentication": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied-authn"
>       },
>       "authorize": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied-authz"
>       },
>       "interbroker": {
>         "allowed": "",
>         "denied": ""
>       }
>     },
>     "crn:///kafka=*/group=*": {
>       "consume": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       }
>     },
>     "crn:///kafka=*/topic=*": {
>       "produce": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       },
>       "consume": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       }
>     }
>   },
>   "destinations": {
>     "topics": {
>       "confluent-audit-log-events": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied-authn": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied-authz": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events_audit": {
>         "retention_ms": 777600
>       }
>     }
>   },
>   "default_topics": {
>     "allowed": "confluent-audit-log-events_audit",
>     "denied": "confluent-audit-log-events"
>   },
>   "excluded_principals": [
>     "User:schemaregistryUser",
>     "User:ANONYMOUS",
>     "User:appSA",
>     "User:admin",
>     "User:connectAdmin",
>     "User:connectorSubmitter",
>     "User:connectorSA",
>     "User:schemaregistryUser",
>     "User:ksqlDBAdmin",
>     "User:ksqlDBUser",
>     "User:controlCenterAndKsqlDBServer",
>     "User:controlcenterAdmin",
>     "User:restAdmin",
>     "User:appSA",
>     "User:clientListen",
>     "User:superUser"
>   ]
> } {code}
> {code:java}
> kafka-configs --bootstrap-server $KAFKA_BOOTSTRAP --entity-type brokers 
> --entity-default --alter --add-config-file audit-log.json {code}
> Yields the following dynamic configs:
> {code:java}
> Default configs for brokers in the cluster are:
>   "destinations"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"destinations"=null}
>   "confluent-audit-log-events-denied-authn"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied-authn"=null}
>   "routes"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"routes"=null}
>   "User=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"User=null}
>   },=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:},=null}
>   "excluded_principals"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"excluded_principals"=null}
>   "confluent-audit-log-events_audit"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events_audit"=null}
>   "authorize"=null sensitive=true 
> 

[jira] [Assigned] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join

2023-04-24 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee reassigned KAFKA-14049:
--

Assignee: Philip Nee

> Relax Non Null Requirement for KStreamGlobalKTable Left Join
> 
>
> Key: KAFKA-14049
> URL: https://issues.apache.org/jira/browse/KAFKA-14049
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Saumya Gupta
>Assignee: Philip Nee
>Priority: Major
>  Labels: beginner, newbie
>
> Null Values in the Stream for a Left Join would indicate a Tombstone Message 
> that needs to propagated if not actually joined with the GlobalKTable 
> message, hence these messages should not be ignored .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on pull request #13603: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-24 Thread via GitHub


jeffkbkim commented on PR #13603:
URL: https://github.com/apache/kafka/pull/13603#issuecomment-1520578217

   test are unrelated
   ```
   Build / JDK 8 and Scala 2.12 / testDescribeQuorumRequestToBrokers() – 
kafka.server.KRaftClusterTest
   31s
   Build / JDK 8 and Scala 2.12 / testUpdateMetadataVersion() – 
kafka.server.KRaftClusterTest
   19s
   Build / JDK 17 and Scala 2.13 / 
testDescribeAtMinIsrPartitions(String).quorum=kraft – 
kafka.admin.TopicCommandIntegrationTest
   12s
   Build / JDK 11 and Scala 2.13 / testSeparateOffsetsTopic – 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest
   2m 58s
   Build / JDK 11 and Scala 2.13 / 
testSendOffsetsWithNoConsumerGroupWriteAccess(String).quorum=kraft – 
kafka.api.AuthorizerIntegrationTest
   13s
   Build / JDK 11 and Scala 2.13 / testConsumptionWithBrokerFailures() – 
kafka.api.ConsumerBounceTest
   30s
   Build / JDK 11 and Scala 2.13 / testCreateClusterAndCreateListDeleteTopic() 
– kafka.server.KRaftClusterTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on pull request #13601: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-24 Thread via GitHub


jeffkbkim commented on PR #13601:
URL: https://github.com/apache/kafka/pull/13601#issuecomment-1520572197

   this one as well
   ```
   [2023-04-24T15:00:18.297Z] FAILURE: Build failed with an exception.
   
   [2023-04-24T15:00:18.297Z] 
   
   [2023-04-24T15:00:18.297Z] * What went wrong:
   
   [2023-04-24T15:00:18.297Z] Execution failed for task 
':streams:streams-scala:compileScala'.
   
   [2023-04-24T15:00:18.297Z] > Timeout waiting to lock zinc-1.3.5_2.13.6_17 
compiler cache (/home/jenkins/.gradle/caches/7.2/zinc-1.3.5_2.13.6_17). It is 
currently in use by another Gradle instance.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on pull request #13600: KAFKA-14869: Bump coordinator value records to flexible versions (KIP…

2023-04-24 Thread via GitHub


jeffkbkim commented on PR #13600:
URL: https://github.com/apache/kafka/pull/13600#issuecomment-1520571706

   @dajac i'm seeing
   ```
   
   [2023-04-24T15:00:10.166Z] FAILURE: Build failed with an exception.
   
   [2023-04-24T15:00:10.166Z] 
   
   [2023-04-24T15:00:10.166Z] * What went wrong:
   
   [2023-04-24T15:00:10.166Z] Execution failed for task 
':streams:streams-scala:compileScala'.
   
   [2023-04-24T15:00:10.166Z] > Timeout waiting to lock zinc-1.3.5_2.12.14_8 
compiler cache (/home/jenkins/.gradle/caches/7.1.1/zinc-1.3.5_2.12.14_8). It is 
currently in use by another Gradle instance.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-24 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1175567311


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if 

[GitHub] [kafka] jolshan opened a new pull request, #13632: KAFKA-14931: Revert KAFKA-14561 in 3.5

2023-04-24 Thread via GitHub


jolshan opened a new pull request, #13632:
URL: https://github.com/apache/kafka/pull/13632

   There were too many blocker bugs so it is easier to revert for this release.
   
   Revert was clean.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14931) Revert KAFKA-14561 in 3.5

2023-04-24 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-14931:
---
Description: We have too many blockers for this commit to work well, so in 
the interest of code quality, we should revert 
https://issues.apache.org/jira/browse/KAFKA-14561 in 3.5 and fix the issues for 
3.6  (was: We have too many blockers for this commit to work well, so in the 
interest of code quality, we should revert in 3.5 and fix the issues for 3.6)

> Revert KAFKA-14561 in 3.5
> -
>
> Key: KAFKA-14931
> URL: https://issues.apache.org/jira/browse/KAFKA-14931
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.5.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> We have too many blockers for this commit to work well, so in the interest of 
> code quality, we should revert 
> https://issues.apache.org/jira/browse/KAFKA-14561 in 3.5 and fix the issues 
> for 3.6



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14931) Revert KAFKA-14561 in 3.5

2023-04-24 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-14931:
---
Affects Version/s: 3.5.0

> Revert KAFKA-14561 in 3.5
> -
>
> Key: KAFKA-14931
> URL: https://issues.apache.org/jira/browse/KAFKA-14931
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.5.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> We have too many blockers for this commit to work well, so in the interest of 
> code quality, we should revert in 3.5 and fix the issues for 3.6



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14884) Include check transaction is still ongoing right before append

2023-04-24 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-14884:
---
Affects Version/s: 3.6.0
   (was: 3.5.0)

> Include check transaction is still ongoing right before append 
> ---
>
> Key: KAFKA-14884
> URL: https://issues.apache.org/jira/browse/KAFKA-14884
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.6.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> Even after checking via AddPartitionsToTxn, the transaction could be aborted 
> after the response. We can add one more check before appending.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14920) Address timeouts and out of order sequences

2023-04-24 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-14920:
---
Affects Version/s: 3.6.0

> Address timeouts and out of order sequences
> ---
>
> Key: KAFKA-14920
> URL: https://issues.apache.org/jira/browse/KAFKA-14920
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.6.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> KAFKA-14844 showed the destructive nature of a timeout on the first produce 
> request for a topic partition (ie one that has no state in psm)
> Since we currently don't validate the first sequence (we will in part 2 of 
> kip-890), any transient error on the first produce can lead to out of order 
> sequences that never recover.
> Originally, KAFKA-14561 relied on the producer's retry mechanism for these 
> transient issues, but until that is fixed, we may need to retry from in the 
> AddPartitionsManager instead. We addressed the concurrent transactions, but 
> there are other errors like coordinator loading that we could run into and 
> see increased out of order issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14916) Fix code that assumes transactional ID implies all records are transactional

2023-04-24 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-14916:
---
Affects Version/s: 3.6.0

> Fix code that assumes transactional ID implies all records are transactional
> 
>
> Key: KAFKA-14916
> URL: https://issues.apache.org/jira/browse/KAFKA-14916
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.6.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> KAFKA-14561 wrote code that assumed that if a transactional ID was included, 
> all record batches were transactional and had the same producer ID.
> This work with improve validation and fix the code that assumes all batches 
> are transactional.
> Currently the code does not enforce that there can not be differing producer 
> IDs. This will be enforced. 
> Further, KAFKA-14561 will not assume all records are transactional.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14931) Revert KAFKA-14561 in 3.5

2023-04-24 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-14931:
--

 Summary: Revert KAFKA-14561 in 3.5
 Key: KAFKA-14931
 URL: https://issues.apache.org/jira/browse/KAFKA-14931
 Project: Kafka
  Issue Type: Task
Reporter: Justine Olshan
Assignee: Justine Olshan


We have too many blockers for this commit to work well, so in the interest of 
code quality, we should revert in 3.5 and fix the issues for 3.6



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on pull request #13257: MINOR: Add ZK migration docs to the packaged docs

2023-04-24 Thread via GitHub


mumrah commented on PR #13257:
URL: https://github.com/apache/kafka/pull/13257#issuecomment-1520515387

   @mimaison i'm fairly occupied with #13407 and #13461 at the moment, but 
could probably find time for this PR in between things this week or next. 
   
   Come to think of it, I'll need to go through the docs before 3.5 to make 
some adjustments based on recent work (e.g., we now support ACL and soon will 
support SCRAM migration). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-24 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1175552910


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if 

Kafka Node Shutting Down Automatically

2023-04-24 Thread Akshay Kumar
Hello team,

   - We are using the zookeeper less Kafka (kafka Kraft).
   - The cluster is having 3 nodes.
   - One of the nodes gets automatically shut down randomly.
   - Checked the logs but didn't get the exact reason.
   - Kafka version - 3.3.1
   - Attaching the log files.
  - Time - 2023-04-21 16:28:23

*state-change.log -*
https://drive.google.com/file/d/1eS-ShKlhGPsIJoybHndlhahJnucU8RWA/view?usp=share_link

*server.log -*
https://drive.google.com/file/d/1Ov5wrQIqx2AS4J7ppFeHJaDySsfsK588/view?usp=share_link

Regards,
*[image: Inline image 1] *
*Akshay Kumar*

*Senior Software Engineer ll |  AMEYO
+91-8556063696*
*[image: Facebook]  [image: Twitter]
 [image: Google Plus]
 [image: Linkedin]
   *
*Latest from the Ameyo Blog*


-- 


* 

*

*Disclaimer:* The information in this communication is confidential and 
may be legally privileged. It is intended solely for the use of the 
individual or entity to whom it is addressed and others authorized to 
receive it. If you are not the intended recipient you are hereby notified 
that any disclosure, copying, distribution or taking action in reliance of 
the contents of this information is strictly prohibited and may be 
unlawful. Drishti is neither liable for the improper, incomplete 
transmission of the information contained in this communication nor any 
delay in its receipt. The communication is not intended to operate as an 
electronic signature under any applicable law. Drishti assumes no 
responsibility for any loss or damage resulting from the use of e-mails.


[GitHub] [kafka] urbandan commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-24 Thread via GitHub


urbandan commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1175425019


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   I understand the points about monotonicity, and if noticed by a user, it 
will be a problem, but I don't really expect users noticing it:
   1. The current API to access checkpoints (in MirrorClient) only returns the 
latest checkpoint, and it is usually used for failover. Users would need to do 
advanced/custom monitoring to keep scraping checkpoints and notice 
non-monotonic checkpoints, which seems highly unlikely to me. Even if it is the 
case, we could just document that monotonicity is not guaranteed.
   2. The auto offset sync feature checks the committed offsets in the target 
cluster, and does not rewind, so it is already protected in this sense.
   
   I understand that my points are not convincing (enough) to make an impact on 
this change - as @gharris1727 suggested, I will try to think of a new solution 
for offset translation.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13631: KAFKA-14909: check zkMigrationReady tag before migration

2023-04-24 Thread via GitHub


mumrah commented on code in PR #13631:
URL: https://github.com/apache/kafka/pull/13631#discussion_r1175362654


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -134,8 +139,7 @@ private void initializeMigrationState() {
 }
 
 private boolean isControllerQuorumReadyForMigration() {
-// TODO implement this
-return true;
+return this.apiVersions.isAllNodeZkMigrationReady();

Review Comment:
   When we check to see if the quorum nodes are ready, it would be useful to 
print an error log showing which controllers were not ready here (similar to 
what we do in `areZkBrokersReadyForMigration`). 
   
   
   
   



##
clients/src/main/java/org/apache/kafka/clients/ApiVersions.java:
##
@@ -64,4 +64,8 @@ public synchronized byte maxUsableProduceMagic() {
 return maxUsableProduceMagic;
 }
 
+// check if all nodes are ZK Migration ready
+public boolean isAllNodeZkMigrationReady() {

Review Comment:
   Can we move this logic into QuorumFeatures? That would allow us to verify 
that all the expected controllers are present in the ApiVersions (in addition 
to checking the `zkMigrationEnabled` flag)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14918) KRaft controller sending ZK controller RPCs to KRaft brokers

2023-04-24 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715852#comment-17715852
 ] 

Mickael Maison commented on KAFKA-14918:


[~mumrah] The associated PR is merged, can we resolve this ticket? or is there 
more work to do?
Also is this the same issue as 
https://issues.apache.org/jira/browse/KAFKA-14698 or something different?

> KRaft controller sending ZK controller RPCs to KRaft brokers
> 
>
> Key: KAFKA-14918
> URL: https://issues.apache.org/jira/browse/KAFKA-14918
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.5.0
>
>
> During the migration, when upgrading a ZK broker to KRaft, the controller is 
> incorrectly sending UpdateMetadata requests to the KRaft controller. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on pull request #13257: MINOR: Add ZK migration docs to the packaged docs

2023-04-24 Thread via GitHub


mimaison commented on PR #13257:
URL: https://github.com/apache/kafka/pull/13257#issuecomment-1520246270

   @mumrah Let me know if you don't have time to finish this PR. We should 
merge this to trunk and backport to 3.5. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13257: MINOR: Add ZK migration docs to the packaged docs

2023-04-24 Thread via GitHub


mimaison commented on code in PR #13257:
URL: https://github.com/apache/kafka/pull/13257#discussion_r1175352627


##
docs/ops.html:
##
@@ -3550,22 +3550,220 @@ Deploying 
Considerations
 
   
-Kafka server's process.role should be set to either 
broker or controller but not both. Combined mode can 
be used in development enviroment but it should be avoided in critical 
deployment evironments.
-For redundancy, a Kafka cluster should use 3 controllers. More than 3 
servers is not recommended in critical environments. In the rare case of a 
partial network failure it is possible for the cluster metadata quorum to 
become unavailable. This limitation will be addresses in a future release of 
Kafka.
+Kafka server's process.role should be set to either 
broker or controller but not both. Combined mode can 
be used in development environments, but it should be avoided in critical 
deployment environments.
+For redundancy, a Kafka cluster should use 3 controllers. More than 3 
servers is not recommended in critical environments. In the rare case of a 
partial network failure it is possible for the cluster metadata quorum to 
become unavailable. This limitation will be addressed in a future release of 
Kafka.
 The Kafka controllers store all of the metadata for the cluster in 
memory and on disk. We believe that for a typical Kafka cluster 5GB of main 
memory and 5GB of disk space on the metadata log director is sufficient.
+  
 
   Missing Features
 
-  The following features are not fullying implemented in KRaft mode:
+  The following features are not fully implemented in KRaft mode:
 
   
 Configuring SCRAM users via the administrative API
 Supporting JBOD configurations with multiple storage directories
 Modifying certain dynamic configurations on the standalone KRaft 
controller
 Delegation tokens
-Upgrade from ZooKeeper mode
   
 
+  ZooKeeper to KRaft 
Migration
+
+  
+ZooKeeper to KRaft migration is considered an Early Access feature in 
3.4.0 and is not recommended for production clusters.
+  
+
+  The following features are not yet supported for ZK to KRaft 
migrations:
+
+  
+Downgrading to ZooKeeper mode during or after the migration
+Migration of ACLs
+Other features not yet supported in 
KRaft
+  
+
+  
+Please report issues with ZooKeeper to KRaft migration using the
+https://issues.apache.org/jira/projects/KAFKA; 
target="_blank">project JIRA and the "kraft" component.
+  
+
+  Terminology
+  
+We use the term "migration" here to refer to the process of changing a 
Kafka cluster's metadata
+system from ZooKeeper to KRaft and migrating existing metadata. An 
"upgrade" refers to installing a newer version of Kafka. It is not recommended 
to
+upgrade the software at the same time as performing a metadata migration.
+  
+
+  
+We also use the term "ZK mode" to refer to Kafka brokers which are using 
ZooKeeper as their metadata
+system. "KRaft mode" refers Kafka brokers which are using a KRaft 
controller quorum as their metadata system.
+  
+
+  Preparing for migration
+  
+Before beginning the migration, the Kafka brokers must be upgraded to 
software version 3.4.0 and have the
+"inter.broker.protocol.version" configuration set to "3.4". See Upgrading to 3.4.0 for
+upgrade instructions.
+  
+
+  
+It is recommended to enable TRACE level logging for the migration 
components while the migration is active. This can
+be done by adding the following log4j configuration to each KRaft 
controller's "log4j.properties" file.
+  
+
+  log4j.logger.org.apache.kafka.metadata.migration=TRACE
+
+  
+It is generally useful to enable DEBUG logging on the KRaft controllers 
and the ZK brokers during the migration.
+  
+
+  Provisioning the KRaft controller quorum
+  
+Two things are needed before the migration can begin. First, the brokers 
must be configured to support the migration and second,
+a KRaft controller quorum must be deployed. The KRaft controllers should 
be provisioned with the same cluster ID as
+the existing Kafka cluster. This can be found by examining one of the 
"meta.properties" files in the data directories
+of the brokers, or by running the following command.
+  
+
+  ./bin/zookeeper-shell.sh localhost:2181 get /cluster/id
+
+  
+The KRaft controller quorum should also be provisioned with the latest 
metadata.version of "3.4".
+For further instructions on KRaft deployment, please refer to the above documentation.
+  
+
+  
+In addition to the standard KRaft configuration, the KRaft controllers 
will need to enable support for the migration
+as well as provide ZooKeeper connection configuration.
+  
+
+  
+Here is a sample config for a KRaft controller that is ready for migration:
+  
+  
+# Sample KRaft cluster controller.properties listening on 9093
+process.roles=controller
+node.id=3000
+controller.quorum.voters=3000@localhost:9093

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Sticky Range Assignor on the Server (KIP-848)

2023-04-24 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1175273145


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * Properties are as follows:
+ * 
+ *  Each member must get at least one partition for every topic that it is 
subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas.
+ *  The topics involved must have the same number of partitions.
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+static class RemainingAssignmentsForMember {
+private final String memberId;
+private final Integer remaining;
+
+public RemainingAssignmentsForMember(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Integer remaining() {
+return remaining;
+}
+
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn(memberId + " subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+private Map> getUnassignedPartitionsPerTopic(final 
AssignmentSpec assignmentSpec, Map> 
assignedStickyPartitionsPerTopic) {
+Map> unassignedPartitionsPerTopic = new 
HashMap<>();
+Map topicsMetadata = 
assignmentSpec.topics();
+
+topicsMetadata.forEach((topicId, assignmentTopicMetadata) -> {
+ArrayList unassignedPartitionsForTopic = new 
ArrayList<>();
+int numPartitions = assignmentTopicMetadata.numPartitions();
+// List of unassigned partitions per topic contains the partitions 
in ascending order.
+Set assignedStickyPartitionsForTopic = 
assignedStickyPartitionsPerTopic.getOrDefault(topicId, new HashSet<>());
+for (int i = 0; i < numPartitions; i++) {
+if (!assignedStickyPartitionsForTopic.contains(i)) {

[jira] [Resolved] (KAFKA-14925) The website shouldn't load external resources

2023-04-24 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-14925.

Resolution: Fixed

> The website shouldn't load external resources
> -
>
> Key: KAFKA-14925
> URL: https://issues.apache.org/jira/browse/KAFKA-14925
> Project: Kafka
>  Issue Type: Improvement
>  Components: website
>Reporter: Mickael Maison
>Assignee: Atul Sharma
>Priority: Major
>
> In includes/_header.htm, we load a resource from fontawesome.com



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14925) The website shouldn't load external resources

2023-04-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715804#comment-17715804
 ] 

ASF GitHub Bot commented on KAFKA-14925:


mimaison merged PR #506:
URL: https://github.com/apache/kafka-site/pull/506




> The website shouldn't load external resources
> -
>
> Key: KAFKA-14925
> URL: https://issues.apache.org/jira/browse/KAFKA-14925
> Project: Kafka
>  Issue Type: Improvement
>  Components: website
>Reporter: Mickael Maison
>Assignee: Atul Sharma
>Priority: Major
>
> In includes/_header.htm, we load a resource from fontawesome.com



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14630) Update zstd-jni version to 1.5.5

2023-04-24 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14630:
-
Fix Version/s: 3.5.0

> Update zstd-jni version to 1.5.5
> 
>
> Key: KAFKA-14630
> URL: https://issues.apache.org/jira/browse/KAFKA-14630
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Divij Vaidya
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.5.0
>
>
> The latest version of zstd-jni as of Jan 2023 is 1.5.2-5. Note that this 
> version inflates the size of binary [2]. Most of the improvements in the diff 
> are associated with DirectBuffer streams (which we don't use) but we *could* 
> use. Also note that other apache projects such as spark  [3]have been using 
> the upgraded version.
> {noformat}
> diviv:zstd-jni/ (master) $ git log --pretty=oneline v1.5.2-1...c1.5.2-5       
>                                              [14:13:34]
> 32b337aed4b3f9b0edf6e0bfaa470ee777128c30 (tag: c1.5.2-5) v1.5.2-5
> 4a46d8126e9bd7f37ae6ebd7d1ad36594fee7847 Fix #151: add support for magicless 
> frames
> d168ce7198dd1827d0f148a6948ff0bb0dd04df3 Fix typos in README and code
> 612e7ebb41e35061209748b8a231fd7e8bef4bcb Fix typos in README and code
> 451df09a7717793f0eae7c310378f8d33907ec85 Upgrade to ScalaTest-3.2
> 37faee49643b2739457716e3752f44fcaa09005f v1.5.2-4
> dcadaf0838412c96d26d44792d96384af55af0e7 Add option to increase the 
> decompression window
> a3b5c4c1a02ddee56f6e2b019d6c8b52f8e63411 Add method to disable the native 
> library loading
> 8e73994bc7731407a3ddea160298ca141dc3b190 Use new ZstdIOException when 
> replacing IOException
> bdbb00b0a45a537a3ac218382d19e47b56156c68 Throw ZstdException when the error 
> is coming from Zstd
> 905202f10e5355cf7ed7a558e909558bc6ebf184 Use negative values of error codes
> 2a19b846c2ccdbc20adfb4a2b4a45b635f641e19 Remove dependency on internal headers
> c970b548bb657df719fb5742a9496e6b3c07f0e3 Wrong srcSize used
> 1f04f24d02bd7c74c890b1325766b808fb353b4e Suppress warnings
> 5d89af50095f91a4da6628d99d4012823da195ff Fix copy/paste error
> 4b64eca0ce9ad8157474b453789d1305c2462c03 Update docs for Android tests
> 3320b520332c70c2bd48d2d7fdf68f0f6c87126a Do not use CXX flags
> c983ae3e086b63a40e1bb430cb2ebf95ecc52c71 (tag: v1.5.2-3) Adjust signature 
> comments after e5c6a3290b8335db7c70877fda22ca26a96c72e4.
> 510bbd6be80592227c6e5cf8cd8d71cb76c0c279 Add methods for streaming 
> (de)compression of direct ByteBuffers.
> 62b9dad49fc00f253cb35c1942c3ca6af4ee2b47 Fix lgtm C++.
> 73ae46e1af16619143b7c87e35ad9c05363e2c97 v1.5.2-3
> e5c6a3290b8335db7c70877fda22ca26a96c72e4 Fix overflows
> 54d3d50c16d96bd8a30e2d4c0a9648001a52d6f9 Fix some error return codes.
> b788a2ed7a5e36e5252b1696e6cc8bae48a7afbc Upgrade scala.
> 31060934c26e080031465702ec369591e12874f8 Add NoFinalizer variants for the 
> direct buffer streams.
> 3d5ab915167f2dfe244d2d2192b66a9feac7c543 (tag: v1.5.2-2) Fix the symbols 
> export on the cross-compiled libraries also
> 8a4993f57119e2b682d1f0b8db263027983208c6 Use different approach for MacOS to 
> not export all symbols
> 15352a3941e8bec2dd1f374768478c02eaf69d6f Don't pass version-script to clang 
> linker, it's not supported
> 9916cbd611cd882b5d4958d4acf0518c01363487 Don't export the zstd symbols, just 
> our own ones
> 277cb2779e2e1922d6de0cf4e1e7519d2647acef fix spelling
> ff3a141d78abc71741734b0d0524bf47096b29f8 Fallback to `scp` if `rsync` in not 
> installed on the build machines
> ba99eec9ac9a5c3eb16200ae67c235a91b16b570 Up the version
> 4c4d9cd382b6e515a7d0d6cd37c1ebb087f5ab73 Support LoongArch64
> 97550e35610cd36e2ad510e20e503c0f997c1a3a Remove 
> frameHeaderSize(Min|Max){noformat}
>  [2] [https://github.com/luben/zstd-jni/issues/237] 
>  
> [3][https://github.com/apache/spark/commit/c50d865fa9eb5207bc8c9992e37843412ec0cbc3]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14630) Update zstd-jni version to 1.5.2-5

2023-04-24 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya resolved KAFKA-14630.
--
  Assignee: Ismael Juma  (was: Divij Vaidya)
Resolution: Fixed

> Update zstd-jni version to 1.5.2-5
> --
>
> Key: KAFKA-14630
> URL: https://issues.apache.org/jira/browse/KAFKA-14630
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Divij Vaidya
>Assignee: Ismael Juma
>Priority: Major
>
> The latest version of zstd-jni as of Jan 2023 is 1.5.2-5. Note that this 
> version inflates the size of binary [2]. Most of the improvements in the diff 
> are associated with DirectBuffer streams (which we don't use) but we *could* 
> use. Also note that other apache projects such as spark  [3]have been using 
> the upgraded version.
> {noformat}
> diviv:zstd-jni/ (master) $ git log --pretty=oneline v1.5.2-1...c1.5.2-5       
>                                              [14:13:34]
> 32b337aed4b3f9b0edf6e0bfaa470ee777128c30 (tag: c1.5.2-5) v1.5.2-5
> 4a46d8126e9bd7f37ae6ebd7d1ad36594fee7847 Fix #151: add support for magicless 
> frames
> d168ce7198dd1827d0f148a6948ff0bb0dd04df3 Fix typos in README and code
> 612e7ebb41e35061209748b8a231fd7e8bef4bcb Fix typos in README and code
> 451df09a7717793f0eae7c310378f8d33907ec85 Upgrade to ScalaTest-3.2
> 37faee49643b2739457716e3752f44fcaa09005f v1.5.2-4
> dcadaf0838412c96d26d44792d96384af55af0e7 Add option to increase the 
> decompression window
> a3b5c4c1a02ddee56f6e2b019d6c8b52f8e63411 Add method to disable the native 
> library loading
> 8e73994bc7731407a3ddea160298ca141dc3b190 Use new ZstdIOException when 
> replacing IOException
> bdbb00b0a45a537a3ac218382d19e47b56156c68 Throw ZstdException when the error 
> is coming from Zstd
> 905202f10e5355cf7ed7a558e909558bc6ebf184 Use negative values of error codes
> 2a19b846c2ccdbc20adfb4a2b4a45b635f641e19 Remove dependency on internal headers
> c970b548bb657df719fb5742a9496e6b3c07f0e3 Wrong srcSize used
> 1f04f24d02bd7c74c890b1325766b808fb353b4e Suppress warnings
> 5d89af50095f91a4da6628d99d4012823da195ff Fix copy/paste error
> 4b64eca0ce9ad8157474b453789d1305c2462c03 Update docs for Android tests
> 3320b520332c70c2bd48d2d7fdf68f0f6c87126a Do not use CXX flags
> c983ae3e086b63a40e1bb430cb2ebf95ecc52c71 (tag: v1.5.2-3) Adjust signature 
> comments after e5c6a3290b8335db7c70877fda22ca26a96c72e4.
> 510bbd6be80592227c6e5cf8cd8d71cb76c0c279 Add methods for streaming 
> (de)compression of direct ByteBuffers.
> 62b9dad49fc00f253cb35c1942c3ca6af4ee2b47 Fix lgtm C++.
> 73ae46e1af16619143b7c87e35ad9c05363e2c97 v1.5.2-3
> e5c6a3290b8335db7c70877fda22ca26a96c72e4 Fix overflows
> 54d3d50c16d96bd8a30e2d4c0a9648001a52d6f9 Fix some error return codes.
> b788a2ed7a5e36e5252b1696e6cc8bae48a7afbc Upgrade scala.
> 31060934c26e080031465702ec369591e12874f8 Add NoFinalizer variants for the 
> direct buffer streams.
> 3d5ab915167f2dfe244d2d2192b66a9feac7c543 (tag: v1.5.2-2) Fix the symbols 
> export on the cross-compiled libraries also
> 8a4993f57119e2b682d1f0b8db263027983208c6 Use different approach for MacOS to 
> not export all symbols
> 15352a3941e8bec2dd1f374768478c02eaf69d6f Don't pass version-script to clang 
> linker, it's not supported
> 9916cbd611cd882b5d4958d4acf0518c01363487 Don't export the zstd symbols, just 
> our own ones
> 277cb2779e2e1922d6de0cf4e1e7519d2647acef fix spelling
> ff3a141d78abc71741734b0d0524bf47096b29f8 Fallback to `scp` if `rsync` in not 
> installed on the build machines
> ba99eec9ac9a5c3eb16200ae67c235a91b16b570 Up the version
> 4c4d9cd382b6e515a7d0d6cd37c1ebb087f5ab73 Support LoongArch64
> 97550e35610cd36e2ad510e20e503c0f997c1a3a Remove 
> frameHeaderSize(Min|Max){noformat}
>  [2] [https://github.com/luben/zstd-jni/issues/237] 
>  
> [3][https://github.com/apache/spark/commit/c50d865fa9eb5207bc8c9992e37843412ec0cbc3]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14630) Update zstd-jni version to 1.5.5

2023-04-24 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14630:
-
Summary: Update zstd-jni version to 1.5.5  (was: Update zstd-jni version to 
1.5.2-5)

> Update zstd-jni version to 1.5.5
> 
>
> Key: KAFKA-14630
> URL: https://issues.apache.org/jira/browse/KAFKA-14630
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Divij Vaidya
>Assignee: Ismael Juma
>Priority: Major
>
> The latest version of zstd-jni as of Jan 2023 is 1.5.2-5. Note that this 
> version inflates the size of binary [2]. Most of the improvements in the diff 
> are associated with DirectBuffer streams (which we don't use) but we *could* 
> use. Also note that other apache projects such as spark  [3]have been using 
> the upgraded version.
> {noformat}
> diviv:zstd-jni/ (master) $ git log --pretty=oneline v1.5.2-1...c1.5.2-5       
>                                              [14:13:34]
> 32b337aed4b3f9b0edf6e0bfaa470ee777128c30 (tag: c1.5.2-5) v1.5.2-5
> 4a46d8126e9bd7f37ae6ebd7d1ad36594fee7847 Fix #151: add support for magicless 
> frames
> d168ce7198dd1827d0f148a6948ff0bb0dd04df3 Fix typos in README and code
> 612e7ebb41e35061209748b8a231fd7e8bef4bcb Fix typos in README and code
> 451df09a7717793f0eae7c310378f8d33907ec85 Upgrade to ScalaTest-3.2
> 37faee49643b2739457716e3752f44fcaa09005f v1.5.2-4
> dcadaf0838412c96d26d44792d96384af55af0e7 Add option to increase the 
> decompression window
> a3b5c4c1a02ddee56f6e2b019d6c8b52f8e63411 Add method to disable the native 
> library loading
> 8e73994bc7731407a3ddea160298ca141dc3b190 Use new ZstdIOException when 
> replacing IOException
> bdbb00b0a45a537a3ac218382d19e47b56156c68 Throw ZstdException when the error 
> is coming from Zstd
> 905202f10e5355cf7ed7a558e909558bc6ebf184 Use negative values of error codes
> 2a19b846c2ccdbc20adfb4a2b4a45b635f641e19 Remove dependency on internal headers
> c970b548bb657df719fb5742a9496e6b3c07f0e3 Wrong srcSize used
> 1f04f24d02bd7c74c890b1325766b808fb353b4e Suppress warnings
> 5d89af50095f91a4da6628d99d4012823da195ff Fix copy/paste error
> 4b64eca0ce9ad8157474b453789d1305c2462c03 Update docs for Android tests
> 3320b520332c70c2bd48d2d7fdf68f0f6c87126a Do not use CXX flags
> c983ae3e086b63a40e1bb430cb2ebf95ecc52c71 (tag: v1.5.2-3) Adjust signature 
> comments after e5c6a3290b8335db7c70877fda22ca26a96c72e4.
> 510bbd6be80592227c6e5cf8cd8d71cb76c0c279 Add methods for streaming 
> (de)compression of direct ByteBuffers.
> 62b9dad49fc00f253cb35c1942c3ca6af4ee2b47 Fix lgtm C++.
> 73ae46e1af16619143b7c87e35ad9c05363e2c97 v1.5.2-3
> e5c6a3290b8335db7c70877fda22ca26a96c72e4 Fix overflows
> 54d3d50c16d96bd8a30e2d4c0a9648001a52d6f9 Fix some error return codes.
> b788a2ed7a5e36e5252b1696e6cc8bae48a7afbc Upgrade scala.
> 31060934c26e080031465702ec369591e12874f8 Add NoFinalizer variants for the 
> direct buffer streams.
> 3d5ab915167f2dfe244d2d2192b66a9feac7c543 (tag: v1.5.2-2) Fix the symbols 
> export on the cross-compiled libraries also
> 8a4993f57119e2b682d1f0b8db263027983208c6 Use different approach for MacOS to 
> not export all symbols
> 15352a3941e8bec2dd1f374768478c02eaf69d6f Don't pass version-script to clang 
> linker, it's not supported
> 9916cbd611cd882b5d4958d4acf0518c01363487 Don't export the zstd symbols, just 
> our own ones
> 277cb2779e2e1922d6de0cf4e1e7519d2647acef fix spelling
> ff3a141d78abc71741734b0d0524bf47096b29f8 Fallback to `scp` if `rsync` in not 
> installed on the build machines
> ba99eec9ac9a5c3eb16200ae67c235a91b16b570 Up the version
> 4c4d9cd382b6e515a7d0d6cd37c1ebb087f5ab73 Support LoongArch64
> 97550e35610cd36e2ad510e20e503c0f997c1a3a Remove 
> frameHeaderSize(Min|Max){noformat}
>  [2] [https://github.com/luben/zstd-jni/issues/237] 
>  
> [3][https://github.com/apache/spark/commit/c50d865fa9eb5207bc8c9992e37843412ec0cbc3]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] yashmayya commented on pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime

2023-04-24 Thread via GitHub


yashmayya commented on PR #13594:
URL: https://github.com/apache/kafka/pull/13594#issuecomment-1520127170

   @vamossagar12 the behavior in `MemoryOffsetBackingStore` should probably be 
changed - if it fails to shutdown cleanly and throws an exception, it'll cause 
further resource closures to be skipped in the worker - 
https://github.com/apache/kafka/blob/2271e748a11919d07698ebce759dca2e3075596a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L241-L264
   
   Also, I'm not sure why the `Worker` can't use this new utility method for 
closing its executor? The main difference is the additional call to 
`awaitTermination` after the call to `shutdownNow` which seems redundant 
anyway. There's also the preservation of the thread's interrupt status which 
I'm thinking maybe the `ThreadUtils::shutdownExecutorServiceQuietly` method 
should also be doing, WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14630) Update zstd-jni version to 1.5.2-5

2023-04-24 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715783#comment-17715783
 ] 

Divij Vaidya commented on KAFKA-14630:
--

This was merged in 
https://github.com/apache/kafka/commit/9c12e462106343fbc6af5873074d48f98687af39

> Update zstd-jni version to 1.5.2-5
> --
>
> Key: KAFKA-14630
> URL: https://issues.apache.org/jira/browse/KAFKA-14630
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
>
> The latest version of zstd-jni as of Jan 2023 is 1.5.2-5. Note that this 
> version inflates the size of binary [2]. Most of the improvements in the diff 
> are associated with DirectBuffer streams (which we don't use) but we *could* 
> use. Also note that other apache projects such as spark  [3]have been using 
> the upgraded version.
> {noformat}
> diviv:zstd-jni/ (master) $ git log --pretty=oneline v1.5.2-1...c1.5.2-5       
>                                              [14:13:34]
> 32b337aed4b3f9b0edf6e0bfaa470ee777128c30 (tag: c1.5.2-5) v1.5.2-5
> 4a46d8126e9bd7f37ae6ebd7d1ad36594fee7847 Fix #151: add support for magicless 
> frames
> d168ce7198dd1827d0f148a6948ff0bb0dd04df3 Fix typos in README and code
> 612e7ebb41e35061209748b8a231fd7e8bef4bcb Fix typos in README and code
> 451df09a7717793f0eae7c310378f8d33907ec85 Upgrade to ScalaTest-3.2
> 37faee49643b2739457716e3752f44fcaa09005f v1.5.2-4
> dcadaf0838412c96d26d44792d96384af55af0e7 Add option to increase the 
> decompression window
> a3b5c4c1a02ddee56f6e2b019d6c8b52f8e63411 Add method to disable the native 
> library loading
> 8e73994bc7731407a3ddea160298ca141dc3b190 Use new ZstdIOException when 
> replacing IOException
> bdbb00b0a45a537a3ac218382d19e47b56156c68 Throw ZstdException when the error 
> is coming from Zstd
> 905202f10e5355cf7ed7a558e909558bc6ebf184 Use negative values of error codes
> 2a19b846c2ccdbc20adfb4a2b4a45b635f641e19 Remove dependency on internal headers
> c970b548bb657df719fb5742a9496e6b3c07f0e3 Wrong srcSize used
> 1f04f24d02bd7c74c890b1325766b808fb353b4e Suppress warnings
> 5d89af50095f91a4da6628d99d4012823da195ff Fix copy/paste error
> 4b64eca0ce9ad8157474b453789d1305c2462c03 Update docs for Android tests
> 3320b520332c70c2bd48d2d7fdf68f0f6c87126a Do not use CXX flags
> c983ae3e086b63a40e1bb430cb2ebf95ecc52c71 (tag: v1.5.2-3) Adjust signature 
> comments after e5c6a3290b8335db7c70877fda22ca26a96c72e4.
> 510bbd6be80592227c6e5cf8cd8d71cb76c0c279 Add methods for streaming 
> (de)compression of direct ByteBuffers.
> 62b9dad49fc00f253cb35c1942c3ca6af4ee2b47 Fix lgtm C++.
> 73ae46e1af16619143b7c87e35ad9c05363e2c97 v1.5.2-3
> e5c6a3290b8335db7c70877fda22ca26a96c72e4 Fix overflows
> 54d3d50c16d96bd8a30e2d4c0a9648001a52d6f9 Fix some error return codes.
> b788a2ed7a5e36e5252b1696e6cc8bae48a7afbc Upgrade scala.
> 31060934c26e080031465702ec369591e12874f8 Add NoFinalizer variants for the 
> direct buffer streams.
> 3d5ab915167f2dfe244d2d2192b66a9feac7c543 (tag: v1.5.2-2) Fix the symbols 
> export on the cross-compiled libraries also
> 8a4993f57119e2b682d1f0b8db263027983208c6 Use different approach for MacOS to 
> not export all symbols
> 15352a3941e8bec2dd1f374768478c02eaf69d6f Don't pass version-script to clang 
> linker, it's not supported
> 9916cbd611cd882b5d4958d4acf0518c01363487 Don't export the zstd symbols, just 
> our own ones
> 277cb2779e2e1922d6de0cf4e1e7519d2647acef fix spelling
> ff3a141d78abc71741734b0d0524bf47096b29f8 Fallback to `scp` if `rsync` in not 
> installed on the build machines
> ba99eec9ac9a5c3eb16200ae67c235a91b16b570 Up the version
> 4c4d9cd382b6e515a7d0d6cd37c1ebb087f5ab73 Support LoongArch64
> 97550e35610cd36e2ad510e20e503c0f997c1a3a Remove 
> frameHeaderSize(Min|Max){noformat}
>  [2] [https://github.com/luben/zstd-jni/issues/237] 
>  
> [3][https://github.com/apache/spark/commit/c50d865fa9eb5207bc8c9992e37843412ec0cbc3]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi commented on pull request #13617: MINOR:code optimization in QuorumController

2023-04-24 Thread via GitHub


hudeqi commented on PR #13617:
URL: https://github.com/apache/kafka/pull/13617#issuecomment-1520104888

   Hello, Guozhang, please review if you have time. @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down

2023-04-24 Thread via GitHub


hudeqi commented on PR #13473:
URL: https://github.com/apache/kafka/pull/13473#issuecomment-1520101597

   > Hello, can you help to review this PR? @mimaison
   
   and pin here @mimaison 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13571: KAFKA-14907:Add the traffic metric of the partition dimension in BrokerTopicStats

2023-04-24 Thread via GitHub


hudeqi commented on PR #13571:
URL: https://github.com/apache/kafka/pull/13571#issuecomment-1520098448

   > @hudeqi Thanks for the PR and KIP. The KIP has to be discussed and voted 
by the community. See the process section in 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
   > 
   > Also a previous KIP had similar goals: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148642648
   
   Sorry, I didn't notice that KIP mentioned this issue before. I read the 
previous KIP discussion, and it seems that there is no conclusion, so how to 
deal with this issue next?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #13631: KAFKA-14909: check zkMigrationReady tag before migration

2023-04-24 Thread via GitHub


showuon opened a new pull request, #13631:
URL: https://github.com/apache/kafka/pull/13631

   1. add `ZkMigrationReady` in apiVersionsResponse
   2. check all nodes if `ZkMigrationReady` are ready before moving to next 
migration state
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime

2023-04-24 Thread via GitHub


viktorsomogyi commented on code in PR #13594:
URL: https://github.com/apache/kafka/pull/13594#discussion_r1175222973


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java:
##
@@ -216,7 +216,7 @@ public void putTopicStateRetriableFailure() {
 }).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), 
any(Callback.class));
 
 store.put(topicStatus);
-verify(kafkaBasedLog, times(2)).send(any(), any(), any());
+verify(kafkaBasedLog, timeout(300).times(2)).send(any(), any(), any());

Review Comment:
   I may be a bit pessimistic but I'd bump this up to 1 second.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13571: KAFKA-14907:Add the traffic metric of the partition dimension in BrokerTopicStats

2023-04-24 Thread via GitHub


mimaison commented on PR #13571:
URL: https://github.com/apache/kafka/pull/13571#issuecomment-1520050947

   @hudeqi Thanks for the PR and KIP. The KIP has to be discussed and voted by 
the community. See the process section in 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
   
   Also a previous KIP had similar goals: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148642648


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-04-24 Thread via GitHub


machi1990 commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1175200643


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.BufferedInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the 
following differences:
+ * - Unlike {@link java.io.BufferedInputStream#skip(long)} this class could be 
configured to not push skip() to
+ * input stream. We may want to avoid pushing this to input stream because 
it's implementation maybe inefficient,
+ * e.g. the case of ZstdInputStream which allocates a new buffer from buffer 
pool, per skip call.
+ * - Unlike {@link java.io.BufferedInputStream}, which allocates an 
intermediate buffer, this uses a buffer supplier to
+ * create the intermediate buffer.
+ * 
+ * Note that:
+ * - this class is not thread safe and shouldn't be used in scenarios where 
multiple threads access this.
+ * - the implementation of this class is performance sensitive. Minor changes 
as usage of ByteBuffer instead of byte[]

Review Comment:
   ```suggestion
* - the implementation of this class is performance sensitive. Minor 
changes such as usage of ByteBuffer instead of byte[]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-24 Thread via GitHub


C0urante commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1175123889


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   I think the most convincing point in favor of monotonicity is that 
violations of it will seem like a bug to users and will almost certainly lead 
to Jira tickets being logged in the future asking us to fix that behavior.
   
   I think it's important to keep in mind that exactly-once offset syncing is 
impossible no matter what, and in addition, any consumer applications that 
cannot tolerate data loss should already have `auto.offset.reset` set to 
`earliest`, regardless of whether we make this change or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-24 Thread via GitHub


C0urante commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1174698662


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -25,17 +25,46 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
+/**
+ * Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation.
+ * A limited number of offset syncs can be stored per TopicPartition, in a 
way which provides better translation
+ * later in the topic, closer to the live end of the topic.
+ * This maintains the following invariants for each topic-partition in the 
in-memory sync storage:
+ * 
+ * Invariant A: syncs[0] is the latest offset sync from the syncs 
topic
+ * Invariant B: For each i,j, i < j, syncs[i] != syncs[j]: 
syncs[i].upstream <= syncs[j].upstream + 2^j - 2^i
+ * Invariant C: For each i,j, i < j, syncs[i] != syncs[j]: 
syncs[j].upstream + 2^(i-2) <= syncs[i].upstream

Review Comment:
   I found this easier to grok when invariants B and C both established clear 
bounds on the value of `syncs[i]`:
   
   ```suggestion
* Invariant C: For each i,j, i < j, syncs[i] != syncs[j]: 
syncs[i].upstream >= syncs[j].upstream + 2^(i-2)
   ```



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -25,17 +25,46 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
+/**
+ * Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation.
+ * A limited number of offset syncs can be stored per TopicPartition, in a 
way which provides better translation
+ * later in the topic, closer to the live end of the topic.
+ * This maintains the following invariants for each topic-partition in the 
in-memory sync storage:
+ * 
+ * Invariant A: syncs[0] is the latest offset sync from the syncs 
topic
+ * Invariant B: For each i,j, i < j, syncs[i] != syncs[j]: 
syncs[i].upstream <= syncs[j].upstream + 2^j - 2^i
+ * Invariant C: For each i,j, i < j, syncs[i] != syncs[j]: 
syncs[j].upstream + 2^(i-2) <= syncs[i].upstream
+ * Invariant D: syncs[63] is the earliest offset sync from the syncs 
topic which was not eligible for compaction
+ * 
+ * The above invariants ensure that the store is kept updated upon receipt 
of each sync, and that distinct
+ * offset syncs are separated by approximately exponential space. They can be 
checked locally (by comparing all adjacent
+ * indexes) but hold globally (for all pairs of any distance). This allows 
updates to the store in linear time.
+ * Offset translation uses the syncs[i] which most closely precedes the 
upstream consumer group's current offset.
+ * For a fixed in-memory state, translation of variable upstream offsets will 
be monotonic.
+ * For variable in-memory state, translation of a fixed upstream offset will 
not be monotonic.
+ * Translation will be unavailable for all topic-partitions before an 
initial read-to-end of the offset syncs topic
+ * is complete. Translation will be unavailable after that if no syncs are 
present for a topic-partition, if replication
+ * started after the position of the consumer group, or if relevant offset 
syncs for the topic were eligible for
+ * compaction at the time of the initial read-to-end.

Review Comment:
   I'm still not a huge fan of the "eligible for compaction" phrase here and 
above. It does seem to cover the exact scenarios it intends to, but it's a bit 
misleading. The fears we have about reusing offset syncs for which a new sync 
on the same topic partition is available don't have to do with compaction, they 
have to do with compression of in-memory state and that compression not being 
applied identically across restarts.
   
   I'd like it if we could a) use language that doesn't imply that compaction 
is the issue and b) add the rationale for not reusing older offset syncs in a 
comment somewhere (possibly where we do the `!readToEnd` check?).



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +180,141 @@ 

[GitHub] [kafka] urbandan commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

2023-04-24 Thread via GitHub


urbandan commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1175076558


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -139,10 +171,103 @@ public void close() {
 protected void handleRecord(ConsumerRecord record) {
 OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
 TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-offsetSyncs.put(sourceTopicPartition, offsetSync);
+offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+}
+
+private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+// Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+// TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+updateSyncArray(mutableSyncs, offsetSync);
+if (log.isTraceEnabled()) {
+StringBuilder stateString = new StringBuilder();
+stateString.append("[");
+for (int i = 0; i < Long.SIZE; i++) {
+if (i != 0) {
+stateString.append(",");
+}
+if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+// Print only if the sync is interesting, a series of 
repeated syncs will appear as ,
+stateString.append(mutableSyncs[i].upstreamOffset());
+stateString.append(":");
+stateString.append(mutableSyncs[i].downstreamOffset());
+}
+}
+stateString.append("]");
+log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+}
+return mutableSyncs;
+}
+
+private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+clearSyncArray(syncs, firstSync);
+return syncs;
+}
+
+private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+for (int i = 0; i < Long.SIZE; i++) {
+syncs[i] = offsetSync;
+}
+}
+
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+long upstreamOffset = offsetSync.upstreamOffset();
+// Old offsets are invalid, so overwrite them all.
+if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @gharris1727 I agree with almost all of what you said, but I would like to 
focus on the last point:
   
   > > Based on our discussion so far, it seems that monotonicity will not 
allow us to implement "old offset checkpointing after restart", which is not 
ideal. Worst case of breaking monotonicity is sub-optimal failover and extra 
re-processing. Worst case for not translating old offsets after restart is 
never checkpointing a group - then, with the default consumer config 
(auto.offset.reset=latest), failover results in data loss.
   > 
   > I don't think the auto.offset.reset=latest situation is a fair criticism 
of this PR, as that can happen with or without this change. And actually, it's 
more likely to happen without this change, as the translation window is smaller.
   
   Sorry, I'm not trying to critique this PR with these points, but to critique 
the monotonicity requirement. My understanding is that this PR would be the one 
making the monotonicity guarantee "official", and I'm worried that if it is 
indeed a tradeoff between monotonicity and old offset translation, we already 
made a choice, making the latter impossible. I am interested in the future of 
offset translation and checkpointing, and I can think of possible solutions, 
but (again, based on my current understanding) those will be limited if we have 
to keep monotonicity as a guarantee.
   
   As an extra note, I'm familiar with the situation behind KAFKA-13659, and in 
that case, the symptom triggering the investigation was not non-monotonic 
checkpoints, but the weird "feature" of copying source offsets as-is when there 
was no available offset-sync, which caused negative lag in the target cluster. 
The checkpoints were monotonic, but they didn't make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13259: MINOR: Remove accidental unnecessary code in src

2023-04-24 Thread via GitHub


mimaison commented on PR #13259:
URL: https://github.com/apache/kafka/pull/13259#issuecomment-1519845506

   Thanks for the updates!
   
   There's a spotbugs failure:
   ```
   Bug type EQ_DOESNT_OVERRIDE_EQUALS
   In class org.apache.kafka.connect.runtime.InternalSinkRecord
   
   EQ_DOESNT_OVERRIDE_EQUALS: Class doesn't override equals in superclass
   
   This class extends a class that defines an equals method and adds fields, 
but doesn't define an equals method itself. Thus, equality on instances of this 
class will ignore the identity of the subclass and the added fields. Be sure 
this is what is intended, and that you don't need to override the equals 
method. Even if you don't need to override the equals method, consider 
overriding it anyway to document the fact that the equals method for the 
subclass just return the result of invoking super.equals(o). 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13538: KAFKA-14462; [8/N] Add ConsumerGroupMember

2023-04-24 Thread via GitHub


clolov commented on code in PR #13538:
URL: https://github.com/apache/kafka/pull/13538#discussion_r1175040697


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * ConsumerGroupMember contains all the information related to a member
+ * within a consumer group. This class is immutable and is fully backed
+ * by records stored in the __consumer_offsets topic.
+ */
+public class ConsumerGroupMember {
+/**
+ * A builder allowing to create a new member or update an
+ * existing one.
+ *
+ * Please refer to the javadoc of {{@link ConsumerGroupMember}} for the
+ * definition of the fields.
+ */
+public static class Builder {
+private final String memberId;
+private int memberEpoch = 0;
+private int previousMemberEpoch = -1;
+private int nextMemberEpoch = 0;
+private String instanceId = null;
+private String rackId = null;
+private int rebalanceTimeoutMs = -1;
+private String clientId = "";
+private String clientHost = "";
+private List subscribedTopicNames = Collections.emptyList();
+private String subscribedTopicRegex = "";
+private String serverAssignorName = null;
+private List clientAssignors = Collections.emptyList();
+private Map> assignedPartitions = 
Collections.emptyMap();
+private Map> partitionsPendingRevocation = 
Collections.emptyMap();
+private Map> partitionsPendingAssignment = 
Collections.emptyMap();
+
+public Builder(String memberId) {
+this.memberId = Objects.requireNonNull(memberId);
+}
+
+public Builder(ConsumerGroupMember member) {
+Objects.requireNonNull(member);
+
+this.memberId = member.memberId;
+this.memberEpoch = member.memberEpoch;
+this.previousMemberEpoch = member.previousMemberEpoch;
+this.nextMemberEpoch = member.nextMemberEpoch;
+this.instanceId = member.instanceId;
+this.rackId = member.rackId;
+this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+this.clientId = member.clientId;
+this.clientHost = member.clientHost;
+this.subscribedTopicNames = member.subscribedTopicNames;
+this.subscribedTopicRegex = member.subscribedTopicRegex;
+this.serverAssignorName = member.serverAssignorName;
+this.clientAssignors = member.clientAssignors;
+this.assignedPartitions = member.assignedPartitions;
+this.partitionsPendingRevocation = 
member.partitionsPendingRevocation;
+this.partitionsPendingAssignment = 
member.partitionsPendingAssignment;
+}
+
+public Builder setMemberEpoch(int memberEpoch) {
+this.memberEpoch = memberEpoch;
+return this;
+}
+
+public Builder setPreviousMemberEpoch(int previousMemberEpoch) {
+this.previousMemberEpoch = previousMemberEpoch;
+return this;
+}
+
+public Builder setNextMemberEpoch(int nextMemberEpoch) {
+this.nextMemberEpoch = nextMemberEpoch;
+return this;
+}
+
+public Builder setInstanceId(String instanceId) {
+this.instanceId = instanceId;
+return this;
+}
+
+public Builder maybeUpdateInstanceId(Optional instanceId) {
+this.instanceId = instanceId.orElse(this.instanceId);
+return this;
+}
+
+public 

[jira] [Assigned] (KAFKA-14909) KRaft Controllers not setting ZkMigrationReady tagged field

2023-04-24 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-14909:
-

Assignee: Luke Chen

> KRaft Controllers not setting ZkMigrationReady tagged field
> ---
>
> Key: KAFKA-14909
> URL: https://issues.apache.org/jira/browse/KAFKA-14909
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: David Arthur
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
> When sending ApiVersionsResponse to other controllers, the KRaft controller 
> is not setting the ZkMigrationReady field. This means, we can't determine if 
> the full KRaft quorum has been properly configured for a migration before 
> triggering the migration.
> As a result, we could start the migration on controller A (which was properly 
> configured), then fail over to controller B (which was not properly 
> configured) and no longer be in dual-write mode.
> The fix is to properly set the ZkMigrationReady tagged field, and to make use 
> of it in KRaftMigrationDriver



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] chia7712 merged pull request #13630: MINOR: fix zookeeper_migration_test.py

2023-04-24 Thread via GitHub


chia7712 merged PR #13630:
URL: https://github.com/apache/kafka/pull/13630


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #13620: MINOR: fix zookeeper_migration_test.py

2023-04-24 Thread via GitHub


chia7712 commented on PR #13620:
URL: https://github.com/apache/kafka/pull/13620#issuecomment-1519754399

   > I think we should be backported to 3.5 too
   
   see #13630


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 opened a new pull request, #13630: MINOR: fix zookeeper_migration_test.py

2023-04-24 Thread via GitHub


chia7712 opened a new pull request, #13630:
URL: https://github.com/apache/kafka/pull/13630

   backport #13620 to branch-3.5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 merged pull request #13620: MINOR: fix zookeeper_migration_test.py

2023-04-24 Thread via GitHub


chia7712 merged PR #13620:
URL: https://github.com/apache/kafka/pull/13620


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14712) Confusing error when writing downgraded FeatureImage

2023-04-24 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-14712:
---
Fix Version/s: 3.6.0
   (was: 3.5.0)

> Confusing error when writing downgraded FeatureImage
> 
>
> Key: KAFKA-14712
> URL: https://issues.apache.org/jira/browse/KAFKA-14712
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: David Arthur
>Priority: Minor
>  Labels: good-first-issue
> Fix For: 3.6.0
>
>
> We have logic in ImageWriterOptions which forces any MetadataVersion lower 
> than 3.3-IV0 to be treated as 3.0-IV1. This was because FeatureLevel records 
> were not supported before 3.3-IV0. 
> When FeatureLevel is written at an older version, the "loss handler" produces 
> an error message warning the user that some metadata is being lost.
> For example, when writing a FeatureImage with flag "foo" at MetadataVersion 
> 3.2-IV0, we get a message like:
> > Metadata has been lost because the following could not be represented in 
> > metadata version 3.0-IV1: feature flag(s): foo
> This is confusing since we told the image builder to use MetadataVersion 
> 3.2-IV0, but 3.0-IV1 appears in the text.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14712) Confusing error when writing downgraded FeatureImage

2023-04-24 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715669#comment-17715669
 ] 

Mickael Maison commented on KAFKA-14712:


The ticket is still unassigned so moving to the next release.

> Confusing error when writing downgraded FeatureImage
> 
>
> Key: KAFKA-14712
> URL: https://issues.apache.org/jira/browse/KAFKA-14712
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: David Arthur
>Priority: Minor
>  Labels: good-first-issue
> Fix For: 3.5.0
>
>
> We have logic in ImageWriterOptions which forces any MetadataVersion lower 
> than 3.3-IV0 to be treated as 3.0-IV1. This was because FeatureLevel records 
> were not supported before 3.3-IV0. 
> When FeatureLevel is written at an older version, the "loss handler" produces 
> an error message warning the user that some metadata is being lost.
> For example, when writing a FeatureImage with flag "foo" at MetadataVersion 
> 3.2-IV0, we get a message like:
> > Metadata has been lost because the following could not be represented in 
> > metadata version 3.0-IV1: feature flag(s): foo
> This is confusing since we told the image builder to use MetadataVersion 
> 3.2-IV0, but 3.0-IV1 appears in the text.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14632) Compression optimization: Remove unnecessary intermediate buffers

2023-04-24 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-14632:
---
Fix Version/s: 3.6.0
   (was: 3.5.0)

> Compression optimization: Remove unnecessary intermediate buffers
> -
>
> Key: KAFKA-14632
> URL: https://issues.apache.org/jira/browse/KAFKA-14632
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
> Fix For: 3.6.0
>
>
> Remove two layers of buffers (the 16KB one and 2KB one) and replace with a 
> single buffer called decompressionBuffer. The time it takes to prepare a 
> batch for decompression will be bounded by the allocation of largest buffer 
> and hence, using only one large buffer (16KB) doesn’t cause any regression.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14632) Compression optimization: Remove unnecessary intermediate buffers

2023-04-24 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715666#comment-17715666
 ] 

Mickael Maison commented on KAFKA-14632:


PR is still a draft so moving to the next release.

> Compression optimization: Remove unnecessary intermediate buffers
> -
>
> Key: KAFKA-14632
> URL: https://issues.apache.org/jira/browse/KAFKA-14632
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
> Fix For: 3.5.0
>
>
> Remove two layers of buffers (the 16KB one and 2KB one) and replace with a 
> single buffer called decompressionBuffer. The time it takes to prepare a 
> batch for decompression will be bounded by the allocation of largest buffer 
> and hence, using only one large buffer (16KB) doesn’t cause any regression.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14186) Add unit tests for BatchFileWriter

2023-04-24 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-14186:
---
Fix Version/s: 3.6.0
   (was: 3.5.0)

> Add unit tests for BatchFileWriter
> --
>
> Key: KAFKA-14186
> URL: https://issues.apache.org/jira/browse/KAFKA-14186
> Project: Kafka
>  Issue Type: Test
>Reporter: David Arthur
>Assignee: Alexandre Dupriez
>Priority: Minor
> Fix For: 3.6.0
>
>
> We have integration tests that cover this class, but no direct unit tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14186) Add unit tests for BatchFileWriter

2023-04-24 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715660#comment-17715660
 ] 

Mickael Maison commented on KAFKA-14186:


I can't see a PR and we're near code freeze so bumping to the next release.

> Add unit tests for BatchFileWriter
> --
>
> Key: KAFKA-14186
> URL: https://issues.apache.org/jira/browse/KAFKA-14186
> Project: Kafka
>  Issue Type: Test
>Reporter: David Arthur
>Assignee: Alexandre Dupriez
>Priority: Minor
> Fix For: 3.5.0
>
>
> We have integration tests that cover this class, but no direct unit tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14839) Exclude protected variable from JavaDocs

2023-04-24 Thread Atul Sharma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715657#comment-17715657
 ] 

Atul Sharma commented on KAFKA-14839:
-

Hi, [~mjsax] can i work in this?

> Exclude protected variable from JavaDocs
> 
>
> Key: KAFKA-14839
> URL: https://issues.apache.org/jira/browse/KAFKA-14839
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Cf 
> [https://kafka.apache.org/31/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#enableSpuriousResultFix]
> The variable `enableSpuriousResultFix` is protected, and it's not public API, 
> and thus should not show up in the JavaDocs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs in 3.5

2023-04-24 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14876:
---
Description: 
Add public documentation for the new Kafka Connect offset management REST API 
being introduced in 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 in 3.5:
 * *GET* /connectors/\{connector}/offsets

  was:
Add public documentation for the new Kafka Connect offset management REST APIs 
being introduced in 
[KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 in 3.5
 * *GET* /connectors/\{connector}/offsets


> Public documentation for new Kafka Connect offset management REST APIs in 3.5
> -
>
> Key: KAFKA-14876
> URL: https://issues.apache.org/jira/browse/KAFKA-14876
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.5.0
>
>
> Add public documentation for the new Kafka Connect offset management REST API 
> being introduced in 
> [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  in 3.5:
>  * *GET* /connectors/\{connector}/offsets



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14302) Infinite probing rebalance if a changelog topic got emptied

2023-04-24 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715656#comment-17715656
 ] 

Mickael Maison commented on KAFKA-14302:


The ticket is still unassigned so moving to the next release.

> Infinite probing rebalance if a changelog topic got emptied
> ---
>
> Key: KAFKA-14302
> URL: https://issues.apache.org/jira/browse/KAFKA-14302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Damien Gasparina
>Priority: Critical
> Fix For: 3.5.0
>
> Attachments: image-2022-10-14-12-04-01-190.png, logs.tar.gz2
>
>
> If a store, with a changelog topic, has been fully emptied, it could generate 
> infinite probing rebalance.
>  
> The scenario is the following:
>  * A Kafka Streams application, deployed on many instances, have a store with 
> a changelog
>  * Many entries are pushed into the changelog, thus the Log end Offset is 
> high, let's say 20,000
>  * Then, the store got emptied, either due to data retention (windowing) or 
> tombstone
>  * Then an instance of the application is restarted, and its local disk is 
> deleted (e.g. Kubernetes without Persistent Volume)
>  * After restart, the application restores the store from the changelog, but 
> does not write a checkpoint file as there are no data
>  * As there are no checkpoint entries, this instance specify a taskOffsetSums 
> with offset set to 0 in the subscriptionUserData
>  * The group leader, during the assignment, then compute a lag of 20,000 (end 
> offsets - task offset), which is greater than the default acceptable lag, 
> thus decide to schedule a probing rebalance
>  * In ther next probing rebalance, nothing changed, so... new probing 
> rebalance
>  
> I was able to reproduce locally with a simple topology:
>  
> {code:java}
> var table = streamsBuilder.stream("table");
> streamsBuilder
> .stream("stream")
> .join(table, (eSt, eTb) -> eSt.toString() + eTb.toString(), 
> JoinWindows.of(Duration.ofSeconds(5)))
> .to("output");{code}
>  
>  
>  
> Due to this issue, application having an empty changelog are experiencing 
> frequent rebalance:
> !image-2022-10-14-12-04-01-190.png!
>  
> With assignments similar to:
> {code:java}
> [hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3] INFO 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - 
> stream-thread 
> [hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3-consumer] 
> Assigned tasks [0_5, 0_4, 0_3, 0_2, 0_1, 0_0] including stateful [0_5, 0_4, 
> 0_3, 0_2, 0_1, 0_0] to clients as: 
> d0e2d556-2587-48e8-b9ab-43a4e8207be6=[activeTasks: ([]) standbyTasks: ([0_0, 
> 0_1, 0_2, 0_3, 0_4, 0_5])]
> 8323d214-4c56-470f-bace-e4291cdf10eb=[activeTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, 
> 0_5]) standbyTasks: ([])].{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14302) Infinite probing rebalance if a changelog topic got emptied

2023-04-24 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-14302:
---
Fix Version/s: 3.6.0
   (was: 3.5.0)

> Infinite probing rebalance if a changelog topic got emptied
> ---
>
> Key: KAFKA-14302
> URL: https://issues.apache.org/jira/browse/KAFKA-14302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Damien Gasparina
>Priority: Critical
> Fix For: 3.6.0
>
> Attachments: image-2022-10-14-12-04-01-190.png, logs.tar.gz2
>
>
> If a store, with a changelog topic, has been fully emptied, it could generate 
> infinite probing rebalance.
>  
> The scenario is the following:
>  * A Kafka Streams application, deployed on many instances, have a store with 
> a changelog
>  * Many entries are pushed into the changelog, thus the Log end Offset is 
> high, let's say 20,000
>  * Then, the store got emptied, either due to data retention (windowing) or 
> tombstone
>  * Then an instance of the application is restarted, and its local disk is 
> deleted (e.g. Kubernetes without Persistent Volume)
>  * After restart, the application restores the store from the changelog, but 
> does not write a checkpoint file as there are no data
>  * As there are no checkpoint entries, this instance specify a taskOffsetSums 
> with offset set to 0 in the subscriptionUserData
>  * The group leader, during the assignment, then compute a lag of 20,000 (end 
> offsets - task offset), which is greater than the default acceptable lag, 
> thus decide to schedule a probing rebalance
>  * In ther next probing rebalance, nothing changed, so... new probing 
> rebalance
>  
> I was able to reproduce locally with a simple topology:
>  
> {code:java}
> var table = streamsBuilder.stream("table");
> streamsBuilder
> .stream("stream")
> .join(table, (eSt, eTb) -> eSt.toString() + eTb.toString(), 
> JoinWindows.of(Duration.ofSeconds(5)))
> .to("output");{code}
>  
>  
>  
> Due to this issue, application having an empty changelog are experiencing 
> frequent rebalance:
> !image-2022-10-14-12-04-01-190.png!
>  
> With assignments similar to:
> {code:java}
> [hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3] INFO 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - 
> stream-thread 
> [hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3-consumer] 
> Assigned tasks [0_5, 0_4, 0_3, 0_2, 0_1, 0_0] including stateful [0_5, 0_4, 
> 0_3, 0_2, 0_1, 0_0] to clients as: 
> d0e2d556-2587-48e8-b9ab-43a4e8207be6=[activeTasks: ([]) standbyTasks: ([0_0, 
> 0_1, 0_2, 0_3, 0_4, 0_5])]
> 8323d214-4c56-470f-bace-e4291cdf10eb=[activeTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, 
> 0_5]) standbyTasks: ([])].{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig

2023-04-24 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-6527:
--
Fix Version/s: 3.6.0
   (was: 3.5.0)

> Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
> 
>
> Key: KAFKA-6527
> URL: https://issues.apache.org/jira/browse/KAFKA-6527
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
>  Labels: flakey
> Fix For: 3.6.0
>
>
> {code:java}
> java.lang.AssertionError: Log segment size increase not applied
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:355)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig

2023-04-24 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715654#comment-17715654
 ] 

Mickael Maison commented on KAFKA-6527:
---

The test is currently disabled and the ticket is unassigned so moving to the 
next release.

> Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
> 
>
> Key: KAFKA-6527
> URL: https://issues.apache.org/jira/browse/KAFKA-6527
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
>  Labels: flakey
> Fix For: 3.5.0
>
>
> {code:java}
> java.lang.AssertionError: Log segment size increase not applied
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:355)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-8280) Flaky Test DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable

2023-04-24 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-8280:
--
Fix Version/s: 3.6.0
   (was: 3.5.0)

> Flaky Test DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable
> ---
>
> Key: KAFKA-8280
> URL: https://issues.apache.org/jira/browse/KAFKA-8280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0
>Reporter: John Roesler
>Priority: Blocker
>  Labels: flakey
> Fix For: 3.6.0
>
>
> I saw this fail again on 
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3979/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testUncleanLeaderElectionEnable/
> {noformat}
> Error Message
> java.lang.AssertionError: Unclean leader not elected
> Stacktrace
> java.lang.AssertionError: Unclean leader not elected
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(DynamicBrokerReconfigurationTest.scala:510)
> {noformat}
> {noformat}
> Standard Output
> Completed Updating config for entity: brokers '0'.
> Completed Updating config for entity: brokers '1'.
> Completed Updating config for entity: brokers '2'.
> [2019-04-23 01:17:11,690] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition testtopic-6 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,690] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition testtopic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,722] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition testtopic-7 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,722] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition testtopic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,760] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=1] Error for partition testtopic-6 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,761] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=1] Error for partition testtopic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,765] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition testtopic-3 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,765] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition testtopic-9 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,779] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-8 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-38 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-2 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-14 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> 

[jira] [Updated] (KAFKA-7957) Flaky Test DynamicBrokerReconfigurationTest#testMetricsReporterUpdate

2023-04-24 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-7957:
--
Fix Version/s: 3.6.0
   (was: 3.5.0)

> Flaky Test DynamicBrokerReconfigurationTest#testMetricsReporterUpdate
> -
>
> Key: KAFKA-7957
> URL: https://issues.apache.org/jira/browse/KAFKA-7957
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.6.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/18/]
> {quote}java.lang.AssertionError: Messages not sent at 
> kafka.utils.TestUtils$.fail(TestUtils.scala:356) at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:766) at 
> kafka.server.DynamicBrokerReconfigurationTest.startProduceConsume(DynamicBrokerReconfigurationTest.scala:1270)
>  at 
> kafka.server.DynamicBrokerReconfigurationTest.testMetricsReporterUpdate(DynamicBrokerReconfigurationTest.scala:650){quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >