[jira] [Updated] (KAFKA-12889) log clean group consider empty log segment to avoid empty log left

2021-06-10 Thread qiang Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

qiang Liu updated KAFKA-12889:
--
Affects Version/s: 3.1.0
   2.8.0

> log clean group consider empty log segment to avoid empty log left
> --
>
> Key: KAFKA-12889
> URL: https://issues.apache.org/jira/browse/KAFKA-12889
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 0.10.1.1, 2.8.0, 3.1.0
>Reporter: qiang Liu
>Priority: Trivial
>
> to avoid log index 4 byte relative offset overflow, log cleaner group check 
> log segments offset to make sure group offset range not exceed Int.MaxValue.
> this offset check currentlly not cosider next is next log segment is empty, 
> so there will left empty log files every about 2^31 messages.
> the left empty logs will be reprocessed every clean cycle, which will rewrite 
> it with same empty content, witch cause little no need io.
> for __consumer_offsets topic, normally we can set cleanup.policy to 
> compact,delete to get rid of this.
> my cluster is 0.10.1.1, but after aylize trunk code, it should has same 
> problem too.
>  
> some of my left empty logs,(run ls -l)
> -rw-r- 1 u g 0 Dec 16 2017 .index
> -rw-r- 1 u g 0 Dec 16 2017 .log
> -rw-r- 1 u g 0 Dec 16 2017 .timeindex
> -rw-r- 1 u g 0 Jan  15 2018 002148249632.index
> -rw-r- 1 u g 0 Jan  15 2018 002148249632.log
> -rw-r- 1 u g 0 Jan  15 2018 002148249632.timeindex
> -rw-r- 1 u g 0 Jan  27 2018 004295766494.index
> -rw-r- 1 u g 0 Jan  27 2018 004295766494.log
> -rw-r- 1 u g 0 Jan  27 2018 004295766494.timeindex
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore

2021-06-10 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361374#comment-17361374
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12935:


Filed https://issues.apache.org/jira/browse/KAFKA-12936

> Flaky Test 
> RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
> 
>
> Key: KAFKA-12935
> URL: https://issues.apache.org/jira/browse/KAFKA-12935
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374)
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12936) In-memory stores are always restored from scratch after dropping out of the group

2021-06-10 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12936:
--

 Summary: In-memory stores are always restored from scratch after 
dropping out of the group
 Key: KAFKA-12936
 URL: https://issues.apache.org/jira/browse/KAFKA-12936
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


Whenever an in-memory store is closed, the actual store contents are garbage 
collected and the state will need to be restored from scratch if the task is 
reassigned and re-initialized. We introduced the recycling feature to prevent 
this from occurring when a task is transitioned from standby to active (or vice 
versa), but it's still possible for the in-memory state to be unnecessarily 
wiped out in the case the member has dropped out of the group. In this case, 
the onPartitionsLost callback is invoked, which will close all active tasks as 
dirty before the member rejoins the group. This means that all these tasks will 
need to be restored from scratch if they are reassigned back to this consumer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore

2021-06-10 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361367#comment-17361367
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12935:


Hm, actually, I guess that could be considered a bug in itself, or at least a 
flaw in  the recycling feature  – for persistent stores with ALOS, dropping out 
of the group only causes tasks to be closed dirty, it doesn't force them to be 
wiped out to restore from the changelog from scratch. But with in-memory 
stores, simply closing them is akin to physically wiping out the state 
directory for that task. Avoiding that was the basis for this recycling feature 
in the first place.

This does kind of suck, but at least it should be a relatively rare event. I'm 
a bit worried about how much complexity it would introduce to the code to fix 
this "bug", but I'll at least file a ticket for it now and we can go from there

> Flaky Test 
> RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
> 
>
> Key: KAFKA-12935
> URL: https://issues.apache.org/jira/browse/KAFKA-12935
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374)
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore

2021-06-10 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361359#comment-17361359
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12935:


Hm, yeah I think I've seen this fail once or twice before. I did look into it a 
bit a while back, and just could not figure out whether it was a possible bug 
or an issue with the test itself. My money's definitely on the latter, but it 
might be worth taking another look sometime if we have the chance. 

If there is a bug that this is uncovering, at least it would not be a 
correctness bug, only an annoyance in restoring when it's not necessary. I 
think it's more likely that the test is flaky because for example the consumer 
dropped out of the group and invoked onPartitionsLost, which would close the 
task as dirty and require restoring from the changelog

> Flaky Test 
> RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
> 
>
> Key: KAFKA-12935
> URL: https://issues.apache.org/jira/browse/KAFKA-12935
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374)
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12851) Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable

2021-06-10 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361312#comment-17361312
 ] 

Matthias J. Sax commented on KAFKA-12851:
-

Failed again.

> Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable
> ---
>
> Key: KAFKA-12851
> URL: https://issues.apache.org/jira/browse/KAFKA-12851
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0
>
>
> Failed twice on a [PR 
> build|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10755/6/testReport/]
> h3. Stacktrace
> org.opentest4j.AssertionFailedError: expected:  but was:  at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at 
> org.apache.kafka.raft.RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable(RaftEventSimulationTest.java:263)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12934) Move some controller classes to the metadata package

2021-06-10 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12934.
--
Fix Version/s: 3.0.0
 Reviewer: Jason Gustafson
   Resolution: Fixed

> Move some controller classes to the metadata package
> 
>
> Key: KAFKA-12934
> URL: https://issues.apache.org/jira/browse/KAFKA-12934
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Minor
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> Move some controller classes to the metadata package so that they can be used 
> with broker snapshots.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore

2021-06-10 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-12935:
---

 Summary: Flaky Test 
RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
 Key: KAFKA-12935
 URL: https://issues.apache.org/jira/browse/KAFKA-12935
 Project: Kafka
  Issue Type: Test
  Components: streams, unit tests
Reporter: Matthias J. Sax


{quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374)
{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe opened a new pull request #10865: KAFKA-12934: Move some controller classes to the metadata package

2021-06-10 Thread GitBox


cmccabe opened a new pull request #10865:
URL: https://github.com/apache/kafka/pull/10865


   Move some controller classes to the metadata package so that they can be
   used with broker snapshots. Rename ControllerTestUtils to
   RecordTestUtils. Move PartitionInfo to PartitionRegistration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12934) Move some controller classes to the metadata package

2021-06-10 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12934:


 Summary: Move some controller classes to the metadata package
 Key: KAFKA-12934
 URL: https://issues.apache.org/jira/browse/KAFKA-12934
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe
Assignee: Colin McCabe


Move some controller classes to the metadata package so that they can be used 
with broker snapshots.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12934) Move some controller classes to the metadata package

2021-06-10 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-12934:
-
Component/s: controller
 Labels: kip-500  (was: )

> Move some controller classes to the metadata package
> 
>
> Key: KAFKA-12934
> URL: https://issues.apache.org/jira/browse/KAFKA-12934
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Minor
>  Labels: kip-500
>
> Move some controller classes to the metadata package so that they can be used 
> with broker snapshots.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kpatelatwork edited a comment on pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-10 Thread GitBox


kpatelatwork edited a comment on pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#issuecomment-859064545


   Passing system tests results
   
![image](https://user-images.githubusercontent.com/29556518/121596558-32270f80-ca05-11eb-843d-9a23824b98fd.png)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-10 Thread GitBox


kpatelatwork commented on pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#issuecomment-859064545


   Passing system tests 
   
![image](https://user-images.githubusercontent.com/29556518/121596558-32270f80-ca05-11eb-843d-9a23824b98fd.png)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10736: KAFKA-9295: revert session timeout to default value

2021-06-10 Thread GitBox


ableegoldman merged pull request #10736:
URL: https://github.com/apache/kafka/pull/10736


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10862: KAFKA-12928: Add a check whether the Task's statestore is actually a directory

2021-06-10 Thread GitBox


ableegoldman commented on a change in pull request #10862:
URL: https://github.com/apache/kafka/pull/10862#discussion_r649497973



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -312,7 +319,7 @@ private boolean taskDirIsEmpty(final File taskDir) {
  */
 File globalStateDir() {
 final File dir = new File(stateDir, "global");
-if (hasPersistentStores && !dir.exists() && !dir.mkdir()) {
+if (hasPersistentStores && ((dir.exists() && !dir.isDirectory()) || 
(!dir.exists() && !dir.mkdir( {
 throw new ProcessorStateException(
 String.format("global state directory [%s] doesn't exist and 
couldn't be created", dir.getPath()));

Review comment:
   ditto here, please add a separate check and exception

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -126,7 +126,7 @@ public StateDirectory(final StreamsConfig config, final 
Time time, final boolean
 throw new ProcessorStateException(
 String.format("base state directory [%s] doesn't exist and 
couldn't be created", stateDirName));
 }
-if (!stateDir.exists() && !stateDir.mkdir()) {
+if ((stateDir.exists() && !stateDir.isDirectory()) || 
(!stateDir.exists() && !stateDir.mkdir())) {

Review comment:
   Please split this up into a separate check for `if ((stateDir.exists() 
&& !stateDir.isDirectory())` and then throw an accurate exception, eg `state 
directory could not be created as there is an existing file with the same name`

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -230,18 +230,25 @@ public UUID initializeProcessId() {
 public File getOrCreateDirectoryForTask(final TaskId taskId) {
 final File taskParentDir = getTaskDirectoryParentName(taskId);
 final File taskDir = new File(taskParentDir, 
StateManagerUtil.toTaskDirString(taskId));
-if (hasPersistentStores && !taskDir.exists()) {
-synchronized (taskDirCreationLock) {
-// to avoid a race condition, we need to check again if the 
directory does not exist:
-// otherwise, two threads might pass the outer `if` (and enter 
the `then` block),
-// one blocks on `synchronized` while the other creates the 
directory,
-// and the blocking one fails when trying to create it after 
it's unblocked
-if (!taskParentDir.exists() && !taskParentDir.mkdir()) {
-throw new ProcessorStateException(
+if (hasPersistentStores) {
+if (!taskDir.exists()) {
+synchronized (taskDirCreationLock) {
+// to avoid a race condition, we need to check again if 
the directory does not exist:
+// otherwise, two threads might pass the outer `if` (and 
enter the `then` block),
+// one blocks on `synchronized` while the other creates 
the directory,
+// and the blocking one fails when trying to create it 
after it's unblocked
+if (!taskParentDir.exists() && !taskParentDir.mkdir()) {
+throw new ProcessorStateException(
 String.format("Parent [%s] of task directory [%s] 
doesn't exist and couldn't be created",
-taskParentDir.getPath(), 
taskDir.getPath()));
+taskParentDir.getPath(), taskDir.getPath()));
+}
+if (!taskDir.exists() && !taskDir.mkdir()) {
+throw new ProcessorStateException(
+String.format("task directory [%s] doesn't exist 
and couldn't be created", taskDir.getPath()));
+}
 }
-if (!taskDir.exists() && !taskDir.mkdir()) {
+} else {
+if (!taskDir.isDirectory()) {

Review comment:
   Same here, this exception message does not apply to the case this is 
trying to catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

2021-06-10 Thread GitBox


ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-859044497


   It should be sufficient to upgrade just the consumers, this is a client-side 
fix only


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10856: MINOR: Small optimizations and removal of unused code in Streams

2021-06-10 Thread GitBox


vvcephei commented on a change in pull request #10856:
URL: https://github.com/apache/kafka/pull/10856#discussion_r649509926



##
File path: 
streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
##
@@ -25,8 +25,8 @@
  */
 public class StoreQueryParameters {
 
-private Integer partition;
-private boolean staleStores;
+private final Integer partition;
+private final boolean staleStores;

Review comment:
   Yeah, it's a limitation of checkstyle, which makes me a bit sad.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10855: MINOR: clean up unneeded `@SuppressWarnings` on Streams module

2021-06-10 Thread GitBox


jlprat commented on pull request #10855:
URL: https://github.com/apache/kafka/pull/10855#issuecomment-859028893


   Unless the CI is still broken, of course


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams (metrics)

2021-06-10 Thread GitBox


vvcephei commented on pull request #10850:
URL: https://github.com/apache/kafka/pull/10850#issuecomment-859023848


   Oh, right, I meant to say that the core integration tests are broken right 
now. I've just run the Streams tests on my laptop, and we also have passing 
tests for the ARM build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams (metrics)

2021-06-10 Thread GitBox


vvcephei commented on pull request #10850:
URL: https://github.com/apache/kafka/pull/10850#issuecomment-859022980


   Thanks for the updates, @wycc ! Merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams (metrics)

2021-06-10 Thread GitBox


vvcephei merged pull request #10850:
URL: https://github.com/apache/kafka/pull/10850


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams (metrics)

2021-06-10 Thread GitBox


vvcephei commented on a change in pull request #10850:
URL: https://github.com/apache/kafka/pull/10850#discussion_r649499312



##
File path: 
streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
##
@@ -135,45 +124,38 @@ public void shouldGetFailedStreamThreadsSensor() {
 false,
 description
 );
-replay(StreamsMetricsImpl.class, streamsMetrics);
 
 final Sensor sensor = 
ClientMetrics.failedStreamThreadSensor(streamsMetrics);
-
-verify(StreamsMetricsImpl.class, streamsMetrics);

Review comment:
   I see. I thought it also verified that the mocked methods actually got 
used.
   
   Looking at the test closer, though, I think that additional verification 
isn't really needed here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

2021-06-10 Thread GitBox


vvcephei merged pull request #9414:
URL: https://github.com/apache/kafka/pull/9414


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10855: MINOR: clean up unneeded `@SuppressWarnings` on Streams module

2021-06-10 Thread GitBox


jlprat commented on pull request #10855:
URL: https://github.com/apache/kafka/pull/10855#issuecomment-858998167


   @mjsax Could you restart the build? Last time it failed with the `1` exit 
code


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2021-06-10 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361198#comment-17361198
 ] 

Josep Prat commented on KAFKA-8940:
---

Hi [~ableegoldman] I read you comment about a month ago, but as I didn't see it 
failing again, I thought it might have been fixed (by maybe some refactoring). 
But I agree with your judgement. I vote for disabling this test and creating a 
new ticket to code it properly.

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-8940
> URL: https://issues.apache.org/jira/browse/KAFKA-8940
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: flaky-test, newbie++
>
> The test does not properly account for windowing. See this comment for full 
> details.
> We can patch this test by fixing the timestamps of the input data to avoid 
> crossing over a window boundary, or account for this when verifying the 
> output. Since we have access to the input data it should be possible to 
> compute whether/when we do cross a window boundary, and adjust the expected 
> output accordingly



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

2021-06-10 Thread GitBox


vvcephei commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-858990902


   It looks like the core integration tests have gotten into bad shape. They've 
been failing on trunk as well. I just ran the Streams integration tests on my 
machine, and they passed, so I'll go ahead and merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

2021-06-10 Thread GitBox


mumrah commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r649482956



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -294,19 +298,14 @@ final class KafkaMetadataLog private (
 }
   }
 
-  override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
+  override def deleteBeforeSnapshot(deleteBeforeSnapshotId: OffsetAndEpoch): 
Boolean = {

Review comment:
   Since this is no longer used to increase the Log Start Offset, I think 
this method can go away.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah opened a new pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

2021-06-10 Thread GitBox


mumrah opened a new pull request #10864:
URL: https://github.com/apache/kafka/pull/10864


   This PR includes changes to KafkaRaftClient and KafkaMetadataLog to support 
periodic cleaning of old log segments and snapshots. 
   
   TODO the rest of the description
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2021-06-10 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361196#comment-17361196
 ] 

A. Sophie Blee-Goldman commented on KAFKA-8940:
---

Hey [~mjsax] [~josep.prat] (and others), if you read my last comment on this 
ticket it explains exactly why this test is failing. Luckily it has to do with 
only the test itself, as it's a bug in the assumptions for the generated input. 
It's just not necessarily a quick fix. 

Maybe we should @Ignore it for now, and then file a separate ticket to circle 
back and correct the assumptions in this test.

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-8940
> URL: https://issues.apache.org/jira/browse/KAFKA-8940
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: flaky-test, newbie++
>
> The test does not properly account for windowing. See this comment for full 
> details.
> We can patch this test by fixing the timestamps of the input data to avoid 
> crossing over a window boundary, or account for this when verifying the 
> output. Since we have access to the input data it should be possible to 
> compute whether/when we do cross a window boundary, and adjust the expected 
> output accordingly



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12468) Initial offsets are copied from source to target cluster

2021-06-10 Thread Amber Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361084#comment-17361084
 ] 

Amber Liu edited comment on KAFKA-12468 at 6/10/21, 7:41 PM:
-

I was using standalone mode with active-passive setup and saw negative offsets 
in the past as well. One of the reasons I found was due to consumer request 
timeout, e.g. org.apache.kafka.common.errors.DisconnectException. I increased 
the request timeout and tasks.max and the offsets are synced correctly now.
{code:java}
# consumer, need to set higher timeout
source.admin.request.timeout.ms = 18
source.consumer.request.timeout.ms = 18{code}
 


was (Author: aaamber):
I was using standalone mode with active-passive setup and saw negative offsets 
in the past as well. One of the reasons I found was due to consumer request 
timeout, e.g. org.apache.kafka.common.errors.DisconnectException. I increased 
the request timeout and tasks.max and the offsets are synced correctly now.
{code:java}
# consumer, need to set higher timeout
source.admin.request.timeout.ms = 18
source.consumer.session.timeout.ms = 18
source.consumer.request.timeout.ms = 18{code}
 

> Initial offsets are copied from source to target cluster
> 
>
> Key: KAFKA-12468
> URL: https://issues.apache.org/jira/browse/KAFKA-12468
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Bart De Neuter
>Priority: Major
>
> We have an active-passive setup where  the 3 connectors from mirror maker 2 
> (heartbeat, checkpoint and source) are running on a dedicated Kafka connect 
> cluster on the target cluster.
> Offset syncing is enabled as specified by KIP-545. But when activated, it 
> seems the offsets from the source cluster are initially copied to the target 
> cluster without translation. This causes a negative lag for all synced 
> consumer groups. Only when we reset the offsets for each topic/partition on 
> the target cluster and produce a record on the topic/partition in the source, 
> the sync starts working correctly. 
> I would expect that the consumer groups are synced but that the current 
> offsets of the source cluster are not copied to the target cluster.
> This is the configuration we are currently using:
> Heartbeat connector
>  
> {code:xml}
> {
>   "name": "mm2-mirror-heartbeat",
>   "config": {
> "name": "mm2-mirror-heartbeat",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "1",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
> Checkpoint connector:
> {code:xml}
> {
>   "name": "mm2-mirror-checkpoint",
>   "config": {
> "name": "mm2-mirror-checkpoint",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  Source connector:
> {code:xml}
> {
>   "name": "mm2-mirror-source",
>   "config": {
> "name": "mm2-mirror-source",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> 

[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-10 Thread GitBox


jolshan commented on a change in pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#discussion_r649472712



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
##
@@ -92,6 +93,13 @@ public MetadataRequest build(short version) {
 if (!data.allowAutoTopicCreation() && version < 4)
 throw new UnsupportedVersionException("MetadataRequest 
versions older than 4 don't support the " +
 "allowAutoTopicCreation field");
+if (data.topics() != null) {

Review comment:
   I think this one is needed as we can't remove in java files. I tried to 
remove all of the ones in scala files.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-10 Thread GitBox


jolshan commented on a change in pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#discussion_r649460759



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
##
@@ -92,6 +93,13 @@ public MetadataRequest build(short version) {
 if (!data.allowAutoTopicCreation() && version < 4)
 throw new UnsupportedVersionException("MetadataRequest 
versions older than 4 don't support the " +
 "allowAutoTopicCreation field");
+if (data.topics() != null) {

Review comment:
   will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12870) RecordAccumulator stuck in a flushing state

2021-06-10 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-12870:
---

Assignee: Jason Gustafson

> RecordAccumulator stuck in a flushing state
> ---
>
> Key: KAFKA-12870
> URL: https://issues.apache.org/jira/browse/KAFKA-12870
> Project: Kafka
>  Issue Type: Bug
>  Components: producer , streams
>Affects Versions: 2.5.1, 2.8.0, 2.7.1, 2.6.2
>Reporter: Niclas Lockner
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: RecordAccumulator.log, full.log
>
>
> After a Kafka Stream with exactly once enabled has performed its first 
> commit, the RecordAccumulator within the stream's internal producer gets 
> stuck in a state where all subsequent ProducerBatches that get allocated are 
> immediately flushed instead of being held in memory until they expire, 
> regardless of the stream's linger or batch size config.
> This is reproduced in the example code found at 
> [https://github.com/niclaslockner/kafka-12870] which can be run with 
> ./gradlew run --args=
> The example has a producer that sends 1 record/sec to one topic, and a Kafka 
> stream with EOS enabled that forwards the records from that topic to another 
> topic with the configuration linger = 5 sec, commit interval = 10 sec.
>  
> The expected behavior when running the example is that the stream's 
> ProducerBatches will expire (or get flushed because of the commit) every 5th 
> second, and that the stream's producer will send a ProduceRequest every 5th 
> second with an expired ProducerBatch that contains 5 records.
> The actual behavior is that the ProducerBatch is made immediately available 
> for the Sender, and the Sender sends one ProduceRequest for each record.
>  
> The example code contains a copy of the RecordAccumulator class (copied from 
> kafka-clients 2.8.0) with some additional logging added to
>  * RecordAccumulator#ready(Cluster, long)
>  * RecordAccumulator#beginFlush()
>  * RecordAccumulator#awaitFlushCompletion()
> These log entries show (see the attached RecordsAccumulator.log)
>  * that the batches are considered sendable because a flush is in progress
>  * that Sender.maybeSendAndPollTransactionalRequest() calls 
> RecordAccumulator's beginFlush() without also calling awaitFlushCompletion(), 
> and that this makes RecordAccumulator's flushesInProgress jump between 1-2 
> instead of the expected 0-1.
>  
> This issue is not reproducible in version 2.3.1 or 2.4.1.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-06-10 Thread GitBox


jsancio commented on pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#issuecomment-858925958


   > @jsancio Can you share more details about the possible concurrency 
scenario with createSnapshot ? BTW, will moving the validation to 
onSnapshotFrozen imply that before creating the snapshot, there's no 
validation? I think maybe we can keep the validation here and add some 
additional check before freeze() which makes the snapshot visible?
   
   @feyman2016, I think it is reasonable to do both. Validate when 
`createSnapshot` is called and validate again in `onSnapshotFrozen`. In both 
cases this validation should be optional. Validate if it is created through 
`RaftClient.createSnapshot`. Don't validate if `KafkaRaftClient` creates the 
snapshot internally because of a `FetchResponse` from the leader.
   
   I have been working on a PR related to this if you want to take a look: 
https://github.com/apache/kafka/pull/10786. It would be nice to get your PR 
merged before my PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9897) Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores

2021-06-10 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361180#comment-17361180
 ] 

Matthias J. Sax commented on KAFKA-9897:


Different test, but same sympton:
{quote}java.lang.AssertionError: Expected: a string containing "Cannot get 
state store source-table because the stream thread is PARTITIONS_ASSIGNED, not 
RUNNING" but: was "The state store, source-table, may have migrated to another 
instance." at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryOnlyActivePartitionStoresByDefault$2(StoreQueryIntegrationTest.java:151)
 at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:420)
 at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryOnlyActivePartitionStoresByDefault(StoreQueryIntegrationTest.java:131){quote}

> Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores
> -
>
> Key: KAFKA-9897
> URL: https://issues.apache.org/jira/browse/KAFKA-9897
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.6.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/22/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQuerySpecificActivePartitionStores/]
> {quote}org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get 
> state store source-table because the stream thread is PARTITIONS_ASSIGNED, 
> not RUNNING at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:85)
>  at 
> org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:61)
>  at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1183) at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:178){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12377) Flaky Test SaslAuthenticatorTest#testSslClientAuthRequiredForSaslSslListener

2021-06-10 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361177#comment-17361177
 ] 

Matthias J. Sax commented on KAFKA-12377:
-

Failed again.

> Flaky Test SaslAuthenticatorTest#testSslClientAuthRequiredForSaslSslListener
> 
>
> Key: KAFKA-12377
> URL: https://issues.apache.org/jira/browse/KAFKA-12377
> Project: Kafka
>  Issue Type: Test
>  Components: core, security, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> {quote}org.opentest4j.AssertionFailedError: expected:  
> but was:  at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at 
> org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:111)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.createAndCheckClientConnectionFailure(SaslAuthenticatorTest.java:2187)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.createAndCheckSslAuthenticationFailure(SaslAuthenticatorTest.java:2210)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.verifySslClientAuthForSaslSslListener(SaslAuthenticatorTest.java:1846)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.testSslClientAuthRequiredForSaslSslListener(SaslAuthenticatorTest.java:1800){quote}
> STDOUT
> {quote}[2021-02-26 07:18:57,220] ERROR Extensions provided in login context 
> without a token 
> (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule:318) 
> java.io.IOException: Extensions provided in login context without a token at 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:165)
>  at 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
>  at 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
> [...]
> Caused by: 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerConfigException:
>  Extensions provided in login context without a token at 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:192)
>  at 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:163)
>  ... 116 more{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2021-06-10 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361176#comment-17361176
 ] 

Matthias J. Sax commented on KAFKA-8940:


Failed again.

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-8940
> URL: https://issues.apache.org/jira/browse/KAFKA-8940
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: flaky-test, newbie++
>
> The test does not properly account for windowing. See this comment for full 
> details.
> We can patch this test by fixing the timestamps of the input data to avoid 
> crossing over a window boundary, or account for this when verifying the 
> output. Since we have access to the input data it should be possible to 
> compute whether/when we do cross a window boundary, and adjust the expected 
> output accordingly



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12933) Flaky test ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled

2021-06-10 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361174#comment-17361174
 ] 

Matthias J. Sax commented on KAFKA-12933:
-

Failed a second time.

> Flaky test 
> ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled
> -
>
> Key: KAFKA-12933
> URL: https://issues.apache.org/jira/browse/KAFKA-12933
> Project: Kafka
>  Issue Type: Test
>  Components: admin
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> {quote}org.opentest4j.AssertionFailedError: expected:  but was:  
> at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at 
> kafka.admin.ReassignPartitionsIntegrationTest.executeAndVerifyReassignment(ReassignPartitionsIntegrationTest.scala:130)
>  at 
> kafka.admin.ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled(ReassignPartitionsIntegrationTest.scala:74){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12933) Flaky test ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled

2021-06-10 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-12933:
---

 Summary: Flaky test 
ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled
 Key: KAFKA-12933
 URL: https://issues.apache.org/jira/browse/KAFKA-12933
 Project: Kafka
  Issue Type: Test
  Components: admin
Reporter: Matthias J. Sax


{quote}org.opentest4j.AssertionFailedError: expected:  but was:  
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at 
org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at 
kafka.admin.ReassignPartitionsIntegrationTest.executeAndVerifyReassignment(ReassignPartitionsIntegrationTest.scala:130)
 at 
kafka.admin.ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled(ReassignPartitionsIntegrationTest.scala:74){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-10 Thread GitBox


ijuma commented on a change in pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#discussion_r649435657



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
##
@@ -92,6 +93,13 @@ public MetadataRequest build(short version) {
 if (!data.allowAutoTopicCreation() && version < 4)
 throw new UnsupportedVersionException("MetadataRequest 
versions older than 4 don't support the " +
 "allowAutoTopicCreation field");
+if (data.topics() != null) {

Review comment:
   Can we remove the unnecessary `()` from the various files?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException

2021-06-10 Thread GitBox


jsancio commented on a change in pull request #10749:
URL: https://github.com/apache/kafka/pull/10749#discussion_r649429301



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -372,27 +371,23 @@ private void maybeFireLeaderChange() {
 
 @Override
 public void initialize() {
-try {
-quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
+quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
 
-long currentTimeMs = time.milliseconds();
-if (quorum.isLeader()) {
-throw new IllegalStateException("Voter cannot initialize as a 
Leader");
-} else if (quorum.isCandidate()) {
-onBecomeCandidate(currentTimeMs);
-} else if (quorum.isFollower()) {
-onBecomeFollower(currentTimeMs);
-}
+long currentTimeMs = time.milliseconds();
+if (quorum.isLeader()) {
+throw new IllegalStateException("Voter cannot initialize as a 
Leader");
+} else if (quorum.isCandidate()) {
+onBecomeCandidate(currentTimeMs);
+} else if (quorum.isFollower()) {
+onBecomeFollower(currentTimeMs);
+}
 
-// When there is only a single voter, become candidate immediately
-if (quorum.isVoter()
-&& quorum.remoteVoters().isEmpty()
-&& !quorum.isCandidate()) {
+// When there is only a single voter, become candidate immediately
+if (quorum.isVoter()
+&& quorum.remoteVoters().isEmpty()
+&& !quorum.isCandidate()) {
 
-transitionToCandidate(currentTimeMs);
-}
-} catch (IOException e) {
-throw new RuntimeException(e);
+transitionToCandidate(currentTimeMs);

Review comment:
   Leaving this note for future readers. My comments above are not 
accurate. I misread the diff generated by GitHub. When I wrote the comment, I 
was under the impression that the old code was handling, wrapping and 
re-throwing the `IOException`.
   Instead the old code wrapped and re-threw the `IOException`; it was not 
handling the exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-10 Thread GitBox


jolshan commented on a change in pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#discussion_r649426864



##
File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
##
@@ -234,6 +235,32 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
 }
   }
 
+  @Test
+  def testInvalidMetadataRequestReturnsError(): Unit = {

Review comment:
   Ah wait. I think I figured out a possible way to do this after all. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-10 Thread GitBox


jolshan commented on a change in pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#discussion_r649425805



##
File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
##
@@ -234,6 +235,32 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
 }
   }
 
+  @Test
+  def testInvalidMetadataRequestReturnsError(): Unit = {

Review comment:
   I must have missed this comment earlier. I moved the test to 
KafkaApisTest. There we don't check if the response is built since that is part 
of the request/response handling code. From what I saw the other invalid tests 
were very focused on one thing. 
   
   I think the biggest question is whether we care to test the null name is 
correctly being set to the empty string. I agree it is good not to start and 
stop another kafka cluster.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on pull request #10813: KAFKA-9559: Change default serde to be `null`

2021-06-10 Thread GitBox


lct45 commented on pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#issuecomment-858849165


   Okay I did some digging -> `defaultKeySerde` and `defaultValueSerde` are 
only called from the `init` of `AbstractProcessorContext`. I checked all the 
places that we call `AbstractProcessorContext#keySerde()` and 
`AbstractProcessorContext#valueSerde()` to make sure we're catching all the 
potential NPEs and I am fairly confident that we're ok. 
   
   I did some streamlining so now we throw the `ConfigException` right after we 
access `AbstractProcessorContext#keySerde()` / `valueSerde()` so we aren't 
passing null's around and there's some tracking b/w throwing errors and calling 
a certain method. The one place this wasn't possible, was with creating state 
stores. Right now, we pass around `context.KeySerde()` and 
`context.valueSerde()` rather than just the `context` in 
`MeteredKeyValueStore`, `MeteredSessionStore`, and `MeteredWindowStore`. The 
tricky part with moving to passing around context is that we need to accept two 
types of context, a `ProcessorContext` and a `StateStoreContext`. I'm open to 
either leaving these calls as less streamlined than everything else, or 
duplicating code in `WrappingNullableUtils` to accept both types of context. 
Thoughts @mjsax @ableegoldman ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-10 Thread GitBox


jsancio commented on pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#issuecomment-858848250


   Created a Jira for renaming the types SnapshotWriter and SnapshotReader, and 
to instead add interface with the same name.
   https://issues.apache.org/jira/browse/KAFKA-12932


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12932) Interfaces for SnapshotReader and SnapshotWriter

2021-06-10 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-12932:
--

 Summary: Interfaces for SnapshotReader and SnapshotWriter
 Key: KAFKA-12932
 URL: https://issues.apache.org/jira/browse/KAFKA-12932
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jose Armando Garcia Sancio


Change the snapshot API so that SnapshotWriter and SnapshotReader are 
interfaces. Change the existing types SnapshotWriter and SnapshotReader to use 
a different name and to implement the interfaces introduced by this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12931) KIP-746: Revise KRaft Metadata Records

2021-06-10 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-12931:
---
Fix Version/s: 3.0.0

> KIP-746: Revise KRaft Metadata Records
> --
>
> Key: KAFKA-12931
> URL: https://issues.apache.org/jira/browse/KAFKA-12931
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, kraft
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dchristle edited a comment on pull request #10847: KAFKA-12921: Upgrade ZSTD JNI from 1.4.9-1 to 1.5.0-1

2021-06-10 Thread GitBox


dchristle edited a comment on pull request #10847:
URL: https://github.com/apache/kafka/pull/10847#issuecomment-858843145


   > The current version is 1.4.9, so I'm a bit confused why we're mentioning 
anything besides 1.5.0.
   
   Woops - I'm getting my wires crossed on a different zstd 1.5.0 related PR I 
have with a larger upgrade. You are right -- this is just from `1.4.9-1` to 
`1.5.0-1`. Sorry for my confusion. I updated the PR description to reflect this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dchristle commented on pull request #10847: KAFKA-12921: Upgrade ZSTD JNI from 1.4.9-1 to 1.5.0-1

2021-06-10 Thread GitBox


dchristle commented on pull request #10847:
URL: https://github.com/apache/kafka/pull/10847#issuecomment-858843145


   > The current version is 1.4.9, so I'm a bit confused why we're mentioning 
anything besides 1.5.0.
   
   Woops - I'm getting my wires crossed on a different zstd 1.5.0 related PR I 
have with a larger upgrade. You are right -- this is just from `1.4.9-1` to 
`1.5.0-1`. Sorry for my confusion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12931) KIP-746: Revise KRaft Metadata Records

2021-06-10 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12931:


 Summary: KIP-746: Revise KRaft Metadata Records
 Key: KAFKA-12931
 URL: https://issues.apache.org/jira/browse/KAFKA-12931
 Project: Kafka
  Issue Type: Improvement
  Components: kraft, controller
Reporter: Colin McCabe
Assignee: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dchristle edited a comment on pull request #10847: KAFKA-12921: Upgrade ZSTD JNI from 1.4.9-1 to 1.5.0-1

2021-06-10 Thread GitBox


dchristle edited a comment on pull request #10847:
URL: https://github.com/apache/kafka/pull/10847#issuecomment-858804912


   @ijuma 
   
   > This is a good change, but can we please quality the perf improvements 
claim? My understanding is that only applies to certain compression levels and 
Kafka currently always picks a specific one. @dongjinleekr is working on making 
that configurable via a separate KIP.
   
   It is true that the most recent performance improvements I quoted (for 
`1.5.0`) appear only in mid-range compression levels. 
   
   > Also, why are we listing versions in the PR description that are not 
relevant to this upgrade?
   
   I tried to follow a previous `zstd-jni` PR's convention here: 
https://github.com/apache/kafka/pull/10285 . I think it gives context on the 
magnitude of the upgrade, but I can change the commit message/PR title to 
remove the existing version reference if you like.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #10847: KAFKA-12921: Upgrade ZSTD JNI from 1.4.9-1 to 1.5.0-1

2021-06-10 Thread GitBox


ijuma edited a comment on pull request #10847:
URL: https://github.com/apache/kafka/pull/10847#issuecomment-858829828


   The current version is 1.4.9, so I'm a bit confused why we're mentioning 
anything besides 1.5.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10847: KAFKA-12921: Upgrade ZSTD JNI from 1.4.9-1 to 1.5.0-1

2021-06-10 Thread GitBox


ijuma commented on pull request #10847:
URL: https://github.com/apache/kafka/pull/10847#issuecomment-858829828


   The current version if 1.4.9, so I'm a bit confused why we're mentioning 
anything besides 1.5.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-06-10 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r649390677



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata 
topic ({@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It 
gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance 
using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link 
#removeAssignmentsForPartitions(Set)} respectively.
+ * 
+ * When a broker is started, controller sends topic partitions that this 
broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link 
#removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link 
RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will 
remove the partitions that are deleted from
+ * this broker which are received through {@link 
#removeAssignmentsForPartitions(Set)}.
+ * 
+ * After receiving these events it invokes {@link 
RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link 
RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
+
+private static final long POLL_INTERVAL_MS = 30L;
+
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+private final KafkaConsumer consumer;
+private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
+private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+private volatile boolean close = false;

Review comment:
   Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-06-10 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r649390516



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+
+/**
+ * This class is responsible for publishing messages into the remote log 
metadata topic partitions.
+ */
+public class ProducerManager implements Closeable {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerManager.class);
+
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+private final KafkaProducer producer;
+private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+private final TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+
+private volatile boolean close = false;

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-06-10 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r649390143



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+
+/**
+ * This class is responsible for publishing messages into the remote log 
metadata topic partitions.
+ */
+public class ProducerManager implements Closeable {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerManager.class);
+
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+private final KafkaProducer producer;
+private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+private final TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+
+private volatile boolean close = false;
+
+public ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
+   RemoteLogMetadataTopicPartitioner 
rlmmTopicPartitioner) {
+this.rlmmConfig = rlmmConfig;
+this.producer = new KafkaProducer<>(rlmmConfig.producerProperties());
+topicPartitioner = rlmmTopicPartitioner;
+}
+
+public RecordMetadata publishMessage(TopicIdPartition topicIdPartition,
+ RemoteLogMetadata remoteLogMetadata) 
throws KafkaException {
+ensureNotClosed();
+
+int metadataPartitionNo = 
topicPartitioner.metadataPartition(topicIdPartition);
+log.debug("Publishing metadata message of partition:[{}] into metadata 
topic partition:[{}] with payload: [{}]",
+topicIdPartition, metadataPartitionNo, remoteLogMetadata);
+
+ProducerCallback callback = new ProducerCallback();
+try {
+if (metadataPartitionNo >= 
rlmmConfig.metadataTopicPartitionsCount()) {
+// This should never occur as long as metadata partitions 
always remain the same.
+throw new KafkaException("Chosen partition no " + 
metadataPartitionNo +
+ " is more than the partition count: " 
+ rlmmConfig.metadataTopicPartitionsCount());
+}
+producer.send(new 
ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNo, 
null,
+serde.serialize(remoteLogMetadata)), callback).get();
+} catch (KafkaException e) {
+throw e;
+} catch (Exception e) {
+throw new KafkaException("Exception occurred while publishing 
message for topicIdPartition: " + topicIdPartition, e);
+}
+
+if (callback.exception() != null) {
+Exception ex = callback.exception();
+if (ex instanceof KafkaException) {
+throw (KafkaException) ex;
+} else {
+throw new KafkaException(ex);
+}
+} else {
+return callback.recordMetadata();
+}
+}
+
+private void ensureNotClosed() {
+if (close) {
+throw new IllegalStateException("This instance is already set in 
close state.");

Review comment:
   Done

##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding 

[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-06-10 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r649388017



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata 
topic ({@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It 
gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance 
using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link 
#removeAssignmentsForPartitions(Set)} respectively.
+ * 
+ * When a broker is started, controller sends topic partitions that this 
broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link 
#removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link 
RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will 
remove the partitions that are deleted from
+ * this broker which are received through {@link 
#removeAssignmentsForPartitions(Set)}.
+ * 
+ * After receiving these events it invokes {@link 
RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link 
RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
+
+private static final long POLL_INTERVAL_MS = 30L;
+
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+private final KafkaConsumer consumer;
+private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
+private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+private volatile boolean close = false;
+private volatile boolean assignPartitions = false;
+
+private final Object assignPartitionsLock = new Object();
+
+// Remote log metadata topic partitions that consumer is assigned to.
+private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+
+// User topic partitions that this broker is a leader/follower for.
+private Set assignedTopicPartitions = 
Collections.emptySet();
+
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+
+public ConsumerTask(KafkaConsumer consumer,
+RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+RemoteLogMetadataTopicPartitioner topicPartitioner) {
+this.consumer = consumer;
+

[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-06-10 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r649387744



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+
+public final class TopicBasedRemoteLogMetadataManagerConfig {
+private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfig.class.getName());
+
+public static final String REMOTE_LOG_METADATA_TOPIC_NAME = 
"__remote_log_metadata";
+
+public static final String 
REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP = 
"remote.log.metadata.topic.replication.factor";
+public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP = 
"remote.log.metadata.topic.num.partitions";
+public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS_PROP 
= "remote.log.metadata.topic.retention.ms";
+public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP = 
"remote.log.metadata.publish.wait.ms";
+
+public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS = 50;
+public static final long 
DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS = -1L;
+public static final int 
DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3;
+public static final long DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS = 60 
* 1000L;
+
+public static final String 
REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_DOC = "Replication factor of 
remote log metadata Topic.";
+public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC = "The 
number of partitions for remote log metadata Topic.";
+public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS_DOC 
= "Remote log metadata topic log retention in milli seconds." +
+"Default: -1, that means unlimited. Users can configure this value 
based on their use cases. " +
+"To avoid any data loss, this value should be more than the 
maximum retention period of any topic enabled with " +
+"tiered storage in the cluster.";
+public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC = "The 
amount of time in milli seconds to wait for the local consumer to " +

Review comment:
   We do not want this to be completely blocked as we want to release the 
remote log thread after a specific timeout in case of any intermittent issues 
so that other partitions tiring can proceed.  

##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, 

[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-06-10 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r649387317



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+
+public final class TopicBasedRemoteLogMetadataManagerConfig {
+private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfig.class.getName());
+
+public static final String REMOTE_LOG_METADATA_TOPIC_NAME = 
"__remote_log_metadata";

Review comment:
   I plan to add this once RLMM is called from remote log layer classes. I 
wanted this change to be self contained for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dchristle commented on pull request #10847: KAFKA-12921: Upgrade ZSTD JNI from 1.4.9-1 to 1.5.0-1

2021-06-10 Thread GitBox


dchristle commented on pull request #10847:
URL: https://github.com/apache/kafka/pull/10847#issuecomment-858804912


   @ijuma 
   
   > This is a good change, but can we please quality the perf improvements 
claim? My understanding is that only applies to certain compression levels and 
Kafka currently always picks a specific one. @dongjinleekr is working on making 
that configurable via a separate KIP.
   
   It is true that the most recent performance improvements I quoted (for 
`1.5.0`) appear only in mid-range compression levels. I did not highlight it in 
my description, but besides bug fixes, the earlier releases quote consistent 
perf improvements:
   
   - `1.4.4`: ~10% decompression bump, no level-dependence quoted
   - `1.4.5`: 5-10% decompression improvement in `x86_64` architecture, +15-50% 
in various `arm` processors
   - `1.4.7`: Improved `--long` mode compression ratio at high levels, 5-30% 
decompression improvement for blocks < 32kB
   - `1.4.9`: 2x faster `--long` mode compression speed
   
   > Also, why are we listing versions in the PR description that are not 
relevant to this upgrade?
   
   I tried to follow a previous `zstd-jni` PR's convention here: 
https://github.com/apache/kafka/pull/10285 . I think it gives context on the 
magnitude of the upgrade, but I can change the commit message/PR title to 
remove the existing version reference if you like.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10733: KAFKA-12816 Added tiered storage related configs including remote log manager configs.

2021-06-10 Thread GitBox


junrao commented on a change in pull request #10733:
URL: https://github.com/apache/kafka/pull/10733#discussion_r649337876



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public final class RemoteLogManagerConfig {
+
+/**
+ * Prefix used for properties to be passed to {@link RemoteStorageManager} 
implementation. Remote log subsystem collects all the properties having
+ * this prefix and passed to {@code RemoteStorageManager} using {@link 
RemoteStorageManager#configure(Map)}.

Review comment:
   passed => passes

##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public final class RemoteLogManagerConfig {
+
+/**
+ * Prefix used for properties to be passed to {@link RemoteStorageManager} 
implementation. Remote log subsystem collects all the properties having
+ * this prefix and passed to {@code RemoteStorageManager} using {@link 
RemoteStorageManager#configure(Map)}.
+ */
+public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP = 
"remote.log.storage.manager.impl.prefix";
+public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteStorageManager " +
+"implementation. For example this value can be `rsm.s3.`.";
+
+/**
+ * Prefix used for properties to be passed to {@link 
RemoteLogMetadataManager} implementation. Remote log subsystem collects all the 
properties having
+ * this prefix and passed to {@code RemoteLogMetadataManager} using {@link 
RemoteLogMetadataManager#configure(Map)}.
+ */
+public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP 
= 

[GitHub] [kafka] ijuma commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-10 Thread GitBox


ijuma commented on a change in pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#discussion_r649369319



##
File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
##
@@ -234,6 +235,32 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
 }
   }
 
+  @Test
+  def testInvalidMetadataRequestReturnsError(): Unit = {

Review comment:
   if you think it's valuable, we can keep it. But can we piggy back on 
another test that tests invalid things? Then we'd save starting and stopping a 
kafka cluster.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-10 Thread GitBox


ijuma commented on a change in pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#discussion_r649368540



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
##
@@ -92,6 +93,15 @@ public MetadataRequest build(short version) {
 if (!data.allowAutoTopicCreation() && version < 4)
 throw new UnsupportedVersionException("MetadataRequest 
versions older than 4 don't support the " +
 "allowAutoTopicCreation field");
+if (version >= 10) {
+if (data.topics() != null) {
+data.topics().forEach(topic -> {
+if (topic.name() == null || topic.topicId() != 
Uuid.ZERO_UUID)
+throw new 
UnsupportedVersionException("MetadataRequest version " + version  +
+" does not support null topic names or 
topic IDs.");

Review comment:
   Yeah, makes sense.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
##
@@ -92,6 +93,15 @@ public MetadataRequest build(short version) {
 if (!data.allowAutoTopicCreation() && version < 4)
 throw new UnsupportedVersionException("MetadataRequest 
versions older than 4 don't support the " +
 "allowAutoTopicCreation field");
+if (version >= 10) {

Review comment:
   +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12468) Initial offsets are copied from source to target cluster

2021-06-10 Thread Amber Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361084#comment-17361084
 ] 

Amber Liu commented on KAFKA-12468:
---

I was using standalone mode with active-passive setup and saw negative offsets 
in the past as well. One of the reasons I found was due to consumer request 
timeout, e.g. org.apache.kafka.common.errors.DisconnectException. I increased 
the request timeout and tasks.max and the offsets are synced correctly now.
{code:java}
# consumer, need to set higher timeout
source.admin.request.timeout.ms = 18
source.consumer.session.timeout.ms = 18
source.consumer.request.timeout.ms = 18{code}
 

> Initial offsets are copied from source to target cluster
> 
>
> Key: KAFKA-12468
> URL: https://issues.apache.org/jira/browse/KAFKA-12468
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Bart De Neuter
>Priority: Major
>
> We have an active-passive setup where  the 3 connectors from mirror maker 2 
> (heartbeat, checkpoint and source) are running on a dedicated Kafka connect 
> cluster on the target cluster.
> Offset syncing is enabled as specified by KIP-545. But when activated, it 
> seems the offsets from the source cluster are initially copied to the target 
> cluster without translation. This causes a negative lag for all synced 
> consumer groups. Only when we reset the offsets for each topic/partition on 
> the target cluster and produce a record on the topic/partition in the source, 
> the sync starts working correctly. 
> I would expect that the consumer groups are synced but that the current 
> offsets of the source cluster are not copied to the target cluster.
> This is the configuration we are currently using:
> Heartbeat connector
>  
> {code:xml}
> {
>   "name": "mm2-mirror-heartbeat",
>   "config": {
> "name": "mm2-mirror-heartbeat",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "1",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
> Checkpoint connector:
> {code:xml}
> {
>   "name": "mm2-mirror-checkpoint",
>   "config": {
> "name": "mm2-mirror-checkpoint",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  Source connector:
> {code:xml}
> {
>   "name": "mm2-mirror-source",
>   "config": {
> "name": "mm2-mirror-source",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-10 Thread GitBox


jolshan commented on a change in pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#discussion_r649360987



##
File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
##
@@ -234,6 +235,32 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
 }
   }
 
+  @Test
+  def testInvalidMetadataRequestReturnsError(): Unit = {

Review comment:
   I guess the only other reason I tested the whole path was to make sure 
the response could be sent back (if the name was null, it could not have), but 
it should suffice to also have a non-null check.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #10653: MINOR: Add missing parameter description from AdminZkClient

2021-06-10 Thread GitBox


mimaison merged pull request #10653:
URL: https://github.com/apache/kafka/pull/10653


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-10 Thread GitBox


jolshan commented on a change in pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#discussion_r649353683



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
##
@@ -92,6 +93,15 @@ public MetadataRequest build(short version) {
 if (!data.allowAutoTopicCreation() && version < 4)
 throw new UnsupportedVersionException("MetadataRequest 
versions older than 4 don't support the " +
 "allowAutoTopicCreation field");
+if (version >= 10) {
+if (data.topics() != null) {
+data.topics().forEach(topic -> {
+if (topic.name() == null || topic.topicId() != 
Uuid.ZERO_UUID)
+throw new 
UnsupportedVersionException("MetadataRequest version " + version  +
+" does not support null topic names or 
topic IDs.");

Review comment:
   Would it make sense to say non-zero topic IDs? Since the null ID is 
represented with all zeros?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-10 Thread GitBox


jolshan commented on a change in pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#discussion_r649352825



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
##
@@ -92,6 +93,15 @@ public MetadataRequest build(short version) {
 if (!data.allowAutoTopicCreation() && version < 4)
 throw new UnsupportedVersionException("MetadataRequest 
versions older than 4 don't support the " +
 "allowAutoTopicCreation field");
+if (version >= 10) {

Review comment:
   We will need to check the version when this is fixed, but I can remove 
the version check for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-10 Thread GitBox


jolshan commented on a change in pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#discussion_r649352099



##
File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
##
@@ -234,6 +235,32 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
 }
   }
 
+  @Test
+  def testInvalidMetadataRequestReturnsError(): Unit = {

Review comment:
   This was one way to test the KafkaApis code, but I suppose I could move 
this to a unit test that only tests the method itself (and not the whole 
request path)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #10805: KAFKA-12436 KIP-720 Deprecate MirrorMaker v1

2021-06-10 Thread GitBox


mimaison commented on pull request #10805:
URL: https://github.com/apache/kafka/pull/10805#issuecomment-858766776


   This KIP was adopted on the basis of having an IdentityReplicationPolicy 
which is in this PR: https://github.com/apache/kafka/pull/10652


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12925) prefixScan missing from intermediate interfaces

2021-06-10 Thread Michael Viamari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361062#comment-17361062
 ] 

Michael Viamari commented on KAFKA-12925:
-

I can flesh out a larger example if needed, but the basic usage for me was 
getting a reference to the state store using {{context.getStateStore()}} inside 
{{Transformer#init}}, and then when attempting to use 
{{TimestampedKeyValueStore#prefixScan}}, the exception was thrown.
{code:java}
public class TransformerPrefixScan implements Transformer> {

private ProcessorContext context;
private TimestampedKeyValueStore lookupStore;

public TransformerPrefixScan() {}

@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
lookupStore = context.getStateStore(lookupStoreName);
}

@Override
public KeyValue transform(K key, V value) {

String keyPrefix = extractPrefix(key);
try (KeyValueIterator> lookupIterator = 
lookupStore.prefixScan(keyPrefix, Serdes.String())) {
//handle results
}

return null;
}

@Override
public void close() {

}
}
{code}

> prefixScan missing from intermediate interfaces
> ---
>
> Key: KAFKA-12925
> URL: https://issues.apache.org/jira/browse/KAFKA-12925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Michael Viamari
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.0.0, 2.8.1
>
>
> [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] 
> introduced support for {{prefixScan}} to StateStores.
> It seems that many of the intermediate {{StateStore}} interfaces are missing 
> a definition for {{prefixScan}}, and as such is not accessible in all cases.
> For example, when accessing the state stores through a the processor context, 
> the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not 
> define {{prefixScan}} and it falls back to the default implementation in 
> {{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison merged pull request #10849: KAFKA-12922: MirrorCheckpointTask should close topic filter

2021-06-10 Thread GitBox


mimaison merged pull request #10849:
URL: https://github.com/apache/kafka/pull/10849


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] edoardocomar commented on pull request #10649: KAFKA-12762: Use connection timeout when polling the network for new …

2021-06-10 Thread GitBox


edoardocomar commented on pull request #10649:
URL: https://github.com/apache/kafka/pull/10649#issuecomment-858752590


   This last commit (thanks @tombentley ) allows the integration test to leave 
the Admin interface unchanged, the expanded factory method is only part of test 
classes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-10 Thread GitBox


jsancio commented on pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#issuecomment-858750625


   @hachikuji thanks for the review. Updated the PR to address your comments.
   
   cc @cmccabe 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-10 Thread GitBox


jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r649273990



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -233,18 +233,40 @@ final class KafkaMetadataLog private (
 log.topicId.get
   }
 
-  override def createSnapshot(snapshotId: OffsetAndEpoch): RawSnapshotWriter = 
{
-// Do not let the state machine create snapshots older than the latest 
snapshot
-latestSnapshotId().ifPresent { latest =>
-  if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
-// Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
-throw new IllegalArgumentException(
-  s"Attempting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
-)
-  }
+  override def createSnapshot(snapshotId: OffsetAndEpoch): 
Optional[RawSnapshotWriter] = {
+if (snapshots.contains(snapshotId)) {
+  Optional.empty()
+} else {
+  Optional.of(FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, 
Optional.of(this)))
+}
+  }
+
+  override def createSnapshotFromEndOffset(endOffset: Long): 
Optional[RawSnapshotWriter] = {
+val highWatermarkOffset = highWatermark.offset
+if (endOffset > highWatermarkOffset) {
+  throw new IllegalArgumentException(
+s"Cannot create a snapshot for an end offset ($endOffset) greater than 
the high-watermark ($highWatermarkOffset)"
+  )
+}
+
+if (endOffset < startOffset) {
+  throw new IllegalArgumentException(
+s"Cannot create a snapshot for an end offset ($endOffset) less than 
the log start offset ($startOffset)"
+  )
+}
+
+val epoch = 
log.leaderEpochCache.flatMap(_.findEpochEntryByEndOffset(endOffset)) match {
+  case Some(epochEntry) =>
+epochEntry.epoch
+  case None =>
+// Assume that the end offset falls in the current epoch since based 
on the check above:

Review comment:
   I removed this code. To avoid scanning the leader epoch cache, I 
reverted the snapshot creation API so that both the offset and the epoch is 
pass to `createSnapshot`. The new code just validates that the given offset and 
epoch are valid according to the record batches in the log and leader epoch 
cache.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-10 Thread GitBox


jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r649320552



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
##
@@ -74,56 +74,49 @@ String name() {
 this.batch = null;
 this.section = null;
 this.numRecords = 0;
-this.numWriteTries = 0;
 }
 
 /**
  * Returns the epoch of the snapshot that we are generating.
  */
 long epoch() {
-return writer.epoch();
+return writer.lastOffset();

Review comment:
   Yes but the names are not great. Updated the names of 
`SnapshotGenerator.epoch` and `SnapshotWriter.lastOffset` to 
`lastOffsetFromLog`. This should make it clear that the offset of the batches 
in the snapshots are independent of the last offset from the log that is 
included in the snapshot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] socutes commented on pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException

2021-06-10 Thread GitBox


socutes commented on pull request #10749:
URL: https://github.com/apache/kafka/pull/10749#issuecomment-858743167


   @hachikuji Please review the changes again! Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] socutes commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException

2021-06-10 Thread GitBox


socutes commented on a change in pull request #10749:
URL: https://github.com/apache/kafka/pull/10749#discussion_r649316231



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -68,15 +68,22 @@ public static Path snapshotPath(Path logDir, OffsetAndEpoch 
snapshotId) {
 return snapshotDir(logDir).resolve(filenameFromSnapshotId(snapshotId) 
+ SUFFIX);
 }
 
-public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) 
throws IOException {
+public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) {
 Path dir = snapshotDir(logDir);
+Path tempFile;
 
-// Create the snapshot directory if it doesn't exists
-Files.createDirectories(dir);
-
-String prefix = String.format("%s-", 
filenameFromSnapshotId(snapshotId));
+try {
+// Create the snapshot directory if it doesn't exists
+Files.createDirectories(dir);
 
-return Files.createTempFile(dir, prefix, PARTIAL_SUFFIX);
+String prefix = String.format("%s-", 
filenameFromSnapshotId(snapshotId));
+tempFile = Files.createTempFile(dir, prefix, PARTIAL_SUFFIX);

Review comment:
   You're right!Thanks

##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -78,7 +85,11 @@ public void append(MemoryRecords records) {
 checkIfFrozen("Append");
 Utils.writeFully(channel, records.buffer());
 } catch (IOException e) {
-throw new RuntimeException(e);
+throw new UncheckedIOException(
+String.format("Error writing file snapshot," +

Review comment:
   Fixed!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12892) InvalidACLException thrown in tests caused jenkins build unstable

2021-06-10 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361026#comment-17361026
 ] 

Igor Soarez commented on KAFKA-12892:
-

Yes it was - by applying the ACL changes to a unique child znode instead of to 
the root, there shouldn't be any interference with other tests. I'm not sure if 
this is the new test that's still a problem or if there's any lingering state 
in zookeeper across builds. It is strange that only some test runs are 
affected. Disabling the test will let us know.

> InvalidACLException thrown in tests caused jenkins build unstable
> -
>
> Key: KAFKA-12892
> URL: https://issues.apache.org/jira/browse/KAFKA-12892
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Igor Soarez
>Priority: Major
> Attachments: image-2021-06-04-21-05-57-222.png
>
>
> In KAFKA-12866, we fixed the issue that Kafka required ZK root access even 
> when using a chroot. But after the PR merged (build #183), trunk build keeps 
> failing at least one test group (mostly, JDK 15 and Scala 2.13). The build 
> result will said nothing useful:
> {code:java}
> > Task :core:integrationTest FAILED
> [2021-06-04T03:19:18.974Z] 
> [2021-06-04T03:19:18.974Z] FAILURE: Build failed with an exception.
> [2021-06-04T03:19:18.974Z] 
> [2021-06-04T03:19:18.974Z] * What went wrong:
> [2021-06-04T03:19:18.974Z] Execution failed for task ':core:integrationTest'.
> [2021-06-04T03:19:18.974Z] > Process 'Gradle Test Executor 128' finished with 
> non-zero exit value 1
> [2021-06-04T03:19:18.974Z]   This problem might be caused by incorrect test 
> process configuration.
> [2021-06-04T03:19:18.974Z]   Please refer to the test execution section in 
> the User Manual at 
> https://docs.gradle.org/7.0.2/userguide/java_testing.html#sec:test_execution
> {code}
>  
> After investigation, I found the failed tests is because there are many 
> `InvalidACLException` thrown during the tests, ex:
>  
> {code:java}
> GssapiAuthenticationTest > testServerNotFoundInKerberosDatabase() FAILED
> [2021-06-04T02:25:45.419Z] 
> org.apache.zookeeper.KeeperException$InvalidACLException: KeeperErrorCode = 
> InvalidACL for /config/topics/__consumer_offsets
> [2021-06-04T02:25:45.419Z] at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:128)
> [2021-06-04T02:25:45.419Z] at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
> [2021-06-04T02:25:45.419Z] at 
> kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583)
> [2021-06-04T02:25:45.419Z] at 
> kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729)
> [2021-06-04T02:25:45.419Z] at 
> kafka.zk.KafkaZkClient.createOrSet$1(KafkaZkClient.scala:366)
> [2021-06-04T02:25:45.419Z] at 
> kafka.zk.KafkaZkClient.setOrCreateEntityConfigs(KafkaZkClient.scala:376)
> [2021-06-04T02:25:45.419Z] at 
> kafka.zk.AdminZkClient.createTopicWithAssignment(AdminZkClient.scala:109)
> [2021-06-04T02:25:45.419Z] at 
> kafka.zk.AdminZkClient.createTopic(AdminZkClient.scala:60)
> [2021-06-04T02:25:45.419Z] at 
> kafka.utils.TestUtils$.$anonfun$createTopic$1(TestUtils.scala:357)
> [2021-06-04T02:25:45.419Z] at 
> kafka.utils.TestUtils$.createTopic(TestUtils.scala:848)
> [2021-06-04T02:25:45.419Z] at 
> kafka.utils.TestUtils$.createOffsetsTopic(TestUtils.scala:428)
> [2021-06-04T02:25:45.419Z] at 
> kafka.api.IntegrationTestHarness.doSetup(IntegrationTestHarness.scala:109)
> [2021-06-04T02:25:45.419Z] at 
> kafka.api.IntegrationTestHarness.setUp(IntegrationTestHarness.scala:84)
> [2021-06-04T02:25:45.419Z] at 
> kafka.server.GssapiAuthenticationTest.setUp(GssapiAuthenticationTest.scala:68)
> {code}
>  
> Log can be found 
> [here|[https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka/branches/trunk/runs/195/nodes/14/steps/145/log/?start=0]]
> After tracing back, I found it could because we add a test in the KAFKA-12866 
> to lock root access in zookeeper, but somehow it didn't unlock after the test 
> in testChrootExistsAndRootIsLocked. Also, while all the InvalidACLException 
> failed tests happened right after testChrootExistsAndRootIsLocked not long. 
> Ex: below testChrootExistsAndRootIsLocked completed at 02:24:30, and the 
> above failed test is at 02:25:45 (and following more than 10 tests with the 
> same InvalidACLException.
> {code:java}
> [2021-06-04T02:24:29.370Z] ZkClientAclTest > 
> testChrootExistsAndRootIsLocked() STARTED
> [2021-06-04T02:24:30.321Z] 
> [2021-06-04T02:24:30.321Z] ZkClientAclTest > 
> testChrootExistsAndRootIsLocked() PASSED{code}
>  
> !image-2021-06-04-21-05-57-222.png|width=489,height=!
> We should have further 

[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-10 Thread GitBox


jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r649281362



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -1009,7 +999,7 @@ private QuorumController(LogContext logContext,
 snapshotRegistry, sessionTimeoutNs, replicaPlacer);
 this.featureControl = new FeatureControlManager(supportedFeatures, 
snapshotRegistry);
 this.producerIdControlManager = new 
ProducerIdControlManager(clusterControl, snapshotRegistry);
-this.snapshotGeneratorManager = new 
SnapshotGeneratorManager(snapshotWriterBuilder);
+this.snapshotGeneratorManager = new 
SnapshotGeneratorManager(raftClient::createSnapshot);

Review comment:
   Fair enough. Removing the `BiFunction` from the constructor. 
`SnapshotGeneratorManager` is an inner class so it should have access to the 
`raftClient`.
   
   > Was this done for testing or something?
   
   I am not sure why this was added. It is not used on tests. I think the 
previous code didn't have access to the `raftClient` because this code was 
merged before reversing the dependency between the `metadata` project and the 
`raft` project.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-10 Thread GitBox


jsancio commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r649273990



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -233,18 +233,40 @@ final class KafkaMetadataLog private (
 log.topicId.get
   }
 
-  override def createSnapshot(snapshotId: OffsetAndEpoch): RawSnapshotWriter = 
{
-// Do not let the state machine create snapshots older than the latest 
snapshot
-latestSnapshotId().ifPresent { latest =>
-  if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
-// Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
-throw new IllegalArgumentException(
-  s"Attempting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
-)
-  }
+  override def createSnapshot(snapshotId: OffsetAndEpoch): 
Optional[RawSnapshotWriter] = {
+if (snapshots.contains(snapshotId)) {
+  Optional.empty()
+} else {
+  Optional.of(FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, 
Optional.of(this)))
+}
+  }
+
+  override def createSnapshotFromEndOffset(endOffset: Long): 
Optional[RawSnapshotWriter] = {
+val highWatermarkOffset = highWatermark.offset
+if (endOffset > highWatermarkOffset) {
+  throw new IllegalArgumentException(
+s"Cannot create a snapshot for an end offset ($endOffset) greater than 
the high-watermark ($highWatermarkOffset)"
+  )
+}
+
+if (endOffset < startOffset) {
+  throw new IllegalArgumentException(
+s"Cannot create a snapshot for an end offset ($endOffset) less than 
the log start offset ($startOffset)"
+  )
+}
+
+val epoch = 
log.leaderEpochCache.flatMap(_.findEpochEntryByEndOffset(endOffset)) match {
+  case Some(epochEntry) =>
+epochEntry.epoch
+  case None =>
+// Assume that the end offset falls in the current epoch since based 
on the check above:

Review comment:
   I remove this code. To avoid scanning the leader epoch cache, I reverted 
the snapshot creation API so that both the offset and the epoch is pass to 
`createSnapshot`. The new code just validates that the given offset and epoch 
are valid according to the record batches in the log and leader epoch cache.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF edited a comment on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

2021-06-10 Thread GitBox


IgnacioAcunaF edited a comment on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-858703098


   PING @hachikuji @apovzner (as I saw you on 
[KAFKA-9507](https://github.com/apache/kafka/pull/8057))


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

2021-06-10 Thread GitBox


IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-858703098


   PING @hachikuji @apovzner as I saw you on 
[KAFKA-9507](https://github.com/apache/kafka/pull/8057)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy

2021-06-10 Thread GitBox


mimaison commented on a change in pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#discussion_r649253273



##
File path: 
connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
##
@@ -159,4 +191,12 @@ public void remoteTopicsSeparatorTest() throws 
InterruptedException {
 assertTrue(remoteTopics.contains("source3__source4__source5__topic6"));
 }
 
+public void testIdentityReplicationTopicSource() {

Review comment:
   Missing `@Test` annotation

##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
##
@@ -492,7 +492,12 @@ boolean isCycle(String topic) {
 } else if (source.equals(sourceAndTarget.target())) {
 return true;
 } else {
-return isCycle(replicationPolicy.upstreamTopic(topic));
+String upstreamTopic = replicationPolicy.upstreamTopic(topic);
+if (upstreamTopic.equals(topic)) {

Review comment:
   Can we cover this new branch with a test in `MirrorSourceConnectorTest`?

##
File path: 
connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
##
@@ -159,4 +191,12 @@ public void remoteTopicsSeparatorTest() throws 
InterruptedException {
 assertTrue(remoteTopics.contains("source3__source4__source5__topic6"));
 }
 
+public void testIdentityReplicationTopicSource() {
+MirrorClient client = new FakeMirrorClient(
+new IdentityReplicationPolicy("primary"), Arrays.asList());
+assertEquals("topic1", client.replicationPolicy()
+.formatRemoteTopic("primary", "topic1"));

Review comment:
   Should we also try `formatRemoteTopic()` with a heartbeat topic?

##
File path: 
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
##
@@ -60,7 +60,7 @@
 private ReplicationPolicy replicationPolicy;
 private Map consumerConfig;
 
-public MirrorClient(Map props) {
+public MirrorClient(Map props) {

Review comment:
   Is this actually needed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10856: MINOR: Small optimizations and removal of unused code in Streams

2021-06-10 Thread GitBox


jlprat commented on pull request #10856:
URL: https://github.com/apache/kafka/pull/10856#issuecomment-858694138


   Thanks both for the reviews


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna merged pull request #10856: MINOR: Small optimizations and removal of unused code in Streams

2021-06-10 Thread GitBox


cadonna merged pull request #10856:
URL: https://github.com/apache/kafka/pull/10856


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10856: MINOR: Small optimizations and removal of unused code in Streams

2021-06-10 Thread GitBox


cadonna commented on pull request #10856:
URL: https://github.com/apache/kafka/pull/10856#issuecomment-858689473


   JDK 11 and ARM passed. Failed tests are unrelated and the issue is known. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #10848: MINOR Updated transaction index as optional in LogSegmentData.

2021-06-10 Thread GitBox


satishd commented on pull request #10848:
URL: https://github.com/apache/kafka/pull/10848#issuecomment-858686600


   Thanks @junrao @ijuma for the review. Addressed the review comments with the 
latest commit. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12892) InvalidACLException thrown in tests caused jenkins build unstable

2021-06-10 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360983#comment-17360983
 ] 

Bruno Cadonna commented on KAFKA-12892:
---

Is PR #10821 supposed to solve the issue?

I still see a lot of 
{code:java}
MultipleListenersWithAdditionalJaasContextTest > testProduceConsume() FAILED
[2021-06-10T11:11:52.209Z] 
org.apache.zookeeper.KeeperException$InvalidACLException: KeeperErrorCode = 
InvalidACL for /brokers/ids
[2021-06-10T11:11:52.209Z] at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:128)
[2021-06-10T11:11:52.209Z] at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
[2021-06-10T11:11:52.209Z] at 
kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583)
[2021-06-10T11:11:52.209Z] at 
kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729)
[2021-06-10T11:11:52.209Z] at 
kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1627)
[2021-06-10T11:11:52.209Z] at 
kafka.zk.KafkaZkClient.$anonfun$createTopLevelPaths$1(KafkaZkClient.scala:1619)
[2021-06-10T11:11:52.209Z] at 
kafka.zk.KafkaZkClient.$anonfun$createTopLevelPaths$1$adapted(KafkaZkClient.scala:1619)
[2021-06-10T11:11:52.209Z] at 
scala.collection.immutable.List.foreach(List.scala:333)
[2021-06-10T11:11:52.209Z] at 
kafka.zk.KafkaZkClient.createTopLevelPaths(KafkaZkClient.scala:1619)
[2021-06-10T11:11:52.209Z] at 
kafka.server.KafkaServer.initZkClient(KafkaServer.scala:454)
[2021-06-10T11:11:52.209Z] at 
kafka.server.KafkaServer.startup(KafkaServer.scala:192)
[2021-06-10T11:11:52.209Z] at 
kafka.utils.TestUtils$.createServer(TestUtils.scala:166)
[2021-06-10T11:11:52.209Z] at 
kafka.server.MultipleListenersWithSameSecurityProtocolBaseTest.$anonfun$setUp$1(MultipleListenersWithSameSecurityProtocolBaseTest.scala:103)
[2021-06-10T11:11:52.210Z] at 
kafka.server.MultipleListenersWithSameSecurityProtocolBaseTest.$anonfun$setUp$1$adapted(MultipleListenersWithSameSecurityProtocolBaseTest.scala:76)
[2021-06-10T11:11:52.210Z] at 
scala.collection.immutable.Range.foreach(Range.scala:190)
{code}

Also on PRs that contain PR #10821. For example 
https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-10856/runs/3/nodes/14/steps/121/log/?start=0

> InvalidACLException thrown in tests caused jenkins build unstable
> -
>
> Key: KAFKA-12892
> URL: https://issues.apache.org/jira/browse/KAFKA-12892
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Igor Soarez
>Priority: Major
> Attachments: image-2021-06-04-21-05-57-222.png
>
>
> In KAFKA-12866, we fixed the issue that Kafka required ZK root access even 
> when using a chroot. But after the PR merged (build #183), trunk build keeps 
> failing at least one test group (mostly, JDK 15 and Scala 2.13). The build 
> result will said nothing useful:
> {code:java}
> > Task :core:integrationTest FAILED
> [2021-06-04T03:19:18.974Z] 
> [2021-06-04T03:19:18.974Z] FAILURE: Build failed with an exception.
> [2021-06-04T03:19:18.974Z] 
> [2021-06-04T03:19:18.974Z] * What went wrong:
> [2021-06-04T03:19:18.974Z] Execution failed for task ':core:integrationTest'.
> [2021-06-04T03:19:18.974Z] > Process 'Gradle Test Executor 128' finished with 
> non-zero exit value 1
> [2021-06-04T03:19:18.974Z]   This problem might be caused by incorrect test 
> process configuration.
> [2021-06-04T03:19:18.974Z]   Please refer to the test execution section in 
> the User Manual at 
> https://docs.gradle.org/7.0.2/userguide/java_testing.html#sec:test_execution
> {code}
>  
> After investigation, I found the failed tests is because there are many 
> `InvalidACLException` thrown during the tests, ex:
>  
> {code:java}
> GssapiAuthenticationTest > testServerNotFoundInKerberosDatabase() FAILED
> [2021-06-04T02:25:45.419Z] 
> org.apache.zookeeper.KeeperException$InvalidACLException: KeeperErrorCode = 
> InvalidACL for /config/topics/__consumer_offsets
> [2021-06-04T02:25:45.419Z] at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:128)
> [2021-06-04T02:25:45.419Z] at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
> [2021-06-04T02:25:45.419Z] at 
> kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583)
> [2021-06-04T02:25:45.419Z] at 
> kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729)
> [2021-06-04T02:25:45.419Z] at 
> kafka.zk.KafkaZkClient.createOrSet$1(KafkaZkClient.scala:366)
> [2021-06-04T02:25:45.419Z] at 
> kafka.zk.KafkaZkClient.setOrCreateEntityConfigs(KafkaZkClient.scala:376)
> 

[GitHub] [kafka] mimaison commented on a change in pull request #9878: KAFKA-6987: Add KafkaFuture.toCompletionStage()

2021-06-10 Thread GitBox


mimaison commented on a change in pull request #9878:
URL: https://github.com/apache/kafka/pull/9878#discussion_r649241897



##
File path: clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
##
@@ -17,68 +17,261 @@
 package org.apache.kafka.common;
 
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Java;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /**
  * A unit test for KafkaFuture.
  */
 @Timeout(120)
 public class KafkaFutureTest {
 
+/** Asserts that the given future is done, didn't fail and wasn't 
cancelled. */
+private void assertIsSuccessful(KafkaFuture future) {
+assertTrue(future.isDone());
+assertFalse(future.isCompletedExceptionally());
+assertFalse(future.isCancelled());
+}
+
+/** Asserts that the given future is done, failed and wasn't cancelled. */
+private void assertIsFailed(KafkaFuture future) {
+assertTrue(future.isDone());
+assertFalse(future.isCancelled());
+assertTrue(future.isCompletedExceptionally());
+}
+
+/** Asserts that the given future is done, didn't fail and was cancelled. 
*/
+private void assertIsCancelled(KafkaFuture future) {
+assertTrue(future.isDone());
+assertTrue(future.isCancelled());
+assertTrue(future.isCompletedExceptionally());
+}
+
+private  void awaitAndAssertResult(KafkaFuture future,
+  T expectedResult,
+  T alternativeValue) {
+assertNotEquals(expectedResult, alternativeValue);
+try {
+assertEquals(expectedResult, future.get(5, TimeUnit.MINUTES));
+} catch (Exception e) {
+throw new AssertionError("Unexpected exception", e);
+}
+try {
+assertEquals(expectedResult, future.get());
+} catch (Exception e) {
+throw new AssertionError("Unexpected exception", e);
+}
+try {
+assertEquals(expectedResult, future.getNow(alternativeValue));
+} catch (Exception e) {
+throw new AssertionError("Unexpected exception", e);
+}
+}
+
+private void awaitAndAssertFailure(KafkaFuture future,
+   Class 
expectedException,
+   String expectedMessage) {
+try {
+future.get(5, TimeUnit.MINUTES);
+fail("Expected an exception");
+} catch (ExecutionException e) {
+assertEquals(expectedException, e.getCause().getClass());
+assertEquals(expectedMessage, e.getCause().getMessage());
+} catch (Exception e) {
+throw new AssertionError("Unexpected exception", e);
+}
+try {
+future.get();
+fail("Expected an exception");
+} catch (ExecutionException e) {
+assertEquals(expectedException, e.getCause().getClass());
+assertEquals(expectedMessage, e.getCause().getMessage());
+} catch (Exception e) {
+throw new AssertionError("Unexpected exception", e);
+}
+try {
+future.getNow(null);
+fail("Expected an exception");
+} catch (ExecutionException e) {
+assertEquals(expectedException, e.getCause().getClass());
+assertEquals(expectedMessage, e.getCause().getMessage());
+} catch (Exception e) {
+throw new AssertionError("Unexpected exception", e);
+}
+}
+
+
+private void awaitAndAssertCancelled(KafkaFuture future, String 
expectedMessage) {
+try {
+future.get(5, TimeUnit.MINUTES);
+fail("Expected an exception");
+} catch (CancellationException e) {
+assertEquals(CancellationException.class, e.getClass());
+assertEquals(expectedMessage, e.getMessage());
+} catch (Exception e) {
+throw new 

[jira] [Commented] (KAFKA-12870) RecordAccumulator stuck in a flushing state

2021-06-10 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360937#comment-17360937
 ] 

Ismael Juma commented on KAFKA-12870:
-

I think the claim is that there's a bug in the `Sender` when exactly-once is 
used.

> RecordAccumulator stuck in a flushing state
> ---
>
> Key: KAFKA-12870
> URL: https://issues.apache.org/jira/browse/KAFKA-12870
> Project: Kafka
>  Issue Type: Bug
>  Components: producer , streams
>Affects Versions: 2.5.1, 2.8.0, 2.7.1, 2.6.2
>Reporter: Niclas Lockner
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: RecordAccumulator.log, full.log
>
>
> After a Kafka Stream with exactly once enabled has performed its first 
> commit, the RecordAccumulator within the stream's internal producer gets 
> stuck in a state where all subsequent ProducerBatches that get allocated are 
> immediately flushed instead of being held in memory until they expire, 
> regardless of the stream's linger or batch size config.
> This is reproduced in the example code found at 
> [https://github.com/niclaslockner/kafka-12870] which can be run with 
> ./gradlew run --args=
> The example has a producer that sends 1 record/sec to one topic, and a Kafka 
> stream with EOS enabled that forwards the records from that topic to another 
> topic with the configuration linger = 5 sec, commit interval = 10 sec.
>  
> The expected behavior when running the example is that the stream's 
> ProducerBatches will expire (or get flushed because of the commit) every 5th 
> second, and that the stream's producer will send a ProduceRequest every 5th 
> second with an expired ProducerBatch that contains 5 records.
> The actual behavior is that the ProducerBatch is made immediately available 
> for the Sender, and the Sender sends one ProduceRequest for each record.
>  
> The example code contains a copy of the RecordAccumulator class (copied from 
> kafka-clients 2.8.0) with some additional logging added to
>  * RecordAccumulator#ready(Cluster, long)
>  * RecordAccumulator#beginFlush()
>  * RecordAccumulator#awaitFlushCompletion()
> These log entries show (see the attached RecordsAccumulator.log)
>  * that the batches are considered sendable because a flush is in progress
>  * that Sender.maybeSendAndPollTransactionalRequest() calls 
> RecordAccumulator's beginFlush() without also calling awaitFlushCompletion(), 
> and that this makes RecordAccumulator's flushesInProgress jump between 1-2 
> instead of the expected 0-1.
>  
> This issue is not reproducible in version 2.3.1 or 2.4.1.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12870) RecordAccumulator stuck in a flushing state

2021-06-10 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12870:

Fix Version/s: 3.0.0

> RecordAccumulator stuck in a flushing state
> ---
>
> Key: KAFKA-12870
> URL: https://issues.apache.org/jira/browse/KAFKA-12870
> Project: Kafka
>  Issue Type: Bug
>  Components: producer , streams
>Affects Versions: 2.5.1, 2.8.0, 2.7.1, 2.6.2
>Reporter: Niclas Lockner
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: RecordAccumulator.log, full.log
>
>
> After a Kafka Stream with exactly once enabled has performed its first 
> commit, the RecordAccumulator within the stream's internal producer gets 
> stuck in a state where all subsequent ProducerBatches that get allocated are 
> immediately flushed instead of being held in memory until they expire, 
> regardless of the stream's linger or batch size config.
> This is reproduced in the example code found at 
> [https://github.com/niclaslockner/kafka-12870] which can be run with 
> ./gradlew run --args=
> The example has a producer that sends 1 record/sec to one topic, and a Kafka 
> stream with EOS enabled that forwards the records from that topic to another 
> topic with the configuration linger = 5 sec, commit interval = 10 sec.
>  
> The expected behavior when running the example is that the stream's 
> ProducerBatches will expire (or get flushed because of the commit) every 5th 
> second, and that the stream's producer will send a ProduceRequest every 5th 
> second with an expired ProducerBatch that contains 5 records.
> The actual behavior is that the ProducerBatch is made immediately available 
> for the Sender, and the Sender sends one ProduceRequest for each record.
>  
> The example code contains a copy of the RecordAccumulator class (copied from 
> kafka-clients 2.8.0) with some additional logging added to
>  * RecordAccumulator#ready(Cluster, long)
>  * RecordAccumulator#beginFlush()
>  * RecordAccumulator#awaitFlushCompletion()
> These log entries show (see the attached RecordsAccumulator.log)
>  * that the batches are considered sendable because a flush is in progress
>  * that Sender.maybeSendAndPollTransactionalRequest() calls 
> RecordAccumulator's beginFlush() without also calling awaitFlushCompletion(), 
> and that this makes RecordAccumulator's flushesInProgress jump between 1-2 
> instead of the expected 0-1.
>  
> This issue is not reproducible in version 2.3.1 or 2.4.1.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dongjinleekr commented on pull request #10847: KAFKA-12921: Upgrade ZSTD JNI from 1.4.9-1 to 1.5.0-1

2021-06-10 Thread GitBox


dongjinleekr commented on pull request #10847:
URL: https://github.com/apache/kafka/pull/10847#issuecomment-858653169


   @ijuma @dchristle Since we have more time for 
[KIP-390](https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Allow+fine-grained+configuration+for+compression),
 I will run the benchmark with this zstd binding. Stay tuned!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10860: MINOR: fix client_compatibility_features_test.py - DescribeAcls is al…

2021-06-10 Thread GitBox


chia7712 merged pull request #10860:
URL: https://github.com/apache/kafka/pull/10860


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-10 Thread GitBox


mimaison commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r649213934



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -858,6 +885,12 @@ public void onSuccess(ClientResponse resp, 
RequestFuture future) {
 public void onFailure(RuntimeException e, RequestFuture future) {
 log.debug("FindCoordinator request failed due to {}", 
e.toString());
 
+if (e instanceof UnsupportedBatchLookupException) {

Review comment:
   I've only taken a very brief look and I think this approach would work 
well for Connect, Producer and Consumer, however it's a bit more complicated 
with Admin.
   
   In Admin, requests are built by lookup strategies. Lookups can be sent to 
any broker so knowing the max version for a specific call is not completely 
trivial. That said, it's not impossible either so if there's concensus it would 
be preferable I can give that a try. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10860: MINOR: fix client_compatibility_features_test.py - DescribeAcls is al…

2021-06-10 Thread GitBox


chia7712 commented on pull request #10860:
URL: https://github.com/apache/kafka/pull/10860#issuecomment-858649375


   > Do the system tests pass with this change?
   
   yep. This system test shows following error message without this patch.
   ```
   java.lang.RuntimeException: Did not expect describeAclsSupported to be 
supported, but it was.
at 
org.apache.kafka.tools.ClientCompatibilityTest.tryFeature(ClientCompatibilityTest.java:525)
at 
org.apache.kafka.tools.ClientCompatibilityTest.tryFeature(ClientCompatibilityTest.java:509)
at 
org.apache.kafka.tools.ClientCompatibilityTest.testAdminClient(ClientCompatibilityTest.java:301)
at 
org.apache.kafka.tools.ClientCompatibilityTest.run(ClientCompatibilityTest.java:238)
at 
org.apache.kafka.tools.ClientCompatibilityTest.main(ClientCompatibilityTest.java:191)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12929) KIP-750: Deprecate support for Java 8 in Kafka 3.0

2021-06-10 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12929:

Summary: KIP-750: Deprecate support for Java 8 in Kafka 3.0  (was: KIP-750: 
Deprecate Java 8 in Kafka 3.0)

> KIP-750: Deprecate support for Java 8 in Kafka 3.0
> --
>
> Key: KAFKA-12929
> URL: https://issues.apache.org/jira/browse/KAFKA-12929
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12930) KIP-751: Deprecate support for Scala 2.12 in Kafka 3.0

2021-06-10 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12930:

Summary: KIP-751: Deprecate support for Scala 2.12 in Kafka 3.0  (was: 
Deprecate support for Scala 2.12 in Kafka 3.0)

> KIP-751: Deprecate support for Scala 2.12 in Kafka 3.0
> --
>
> Key: KAFKA-12930
> URL: https://issues.apache.org/jira/browse/KAFKA-12930
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12930) Deprecate support for Scala 2.12 in Kafka 3.0

2021-06-10 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-12930:
---

 Summary: Deprecate support for Scala 2.12 in Kafka 3.0
 Key: KAFKA-12930
 URL: https://issues.apache.org/jira/browse/KAFKA-12930
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma
Assignee: Ismael Juma
 Fix For: 3.0.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12929) KIP-750: Deprecate Java 8 in Kafka 3.0

2021-06-10 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-12929:
---

 Summary: KIP-750: Deprecate Java 8 in Kafka 3.0
 Key: KAFKA-12929
 URL: https://issues.apache.org/jira/browse/KAFKA-12929
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma
Assignee: Ismael Juma
 Fix For: 3.0.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >