[GitHub] [kafka] mjsax merged pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

2021-06-09 Thread GitBox


mjsax merged pull request #10810:
URL: https://github.com/apache/kafka/pull/10810


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10856: MINOR: Small optimizations and removal of unused code in Streams

2021-06-09 Thread GitBox


jlprat commented on a change in pull request #10856:
URL: https://github.com/apache/kafka/pull/10856#discussion_r648867754



##
File path: 
streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
##
@@ -25,8 +25,8 @@
  */
 public class StoreQueryParameters {
 
-private Integer partition;
-private boolean staleStores;
+private final Integer partition;
+private final boolean staleStores;

Review comment:
   As far as I can see, these fields are only assigned at constructor, and 
are never modified. Being this, together with in place initialization, the only 
valid patterns for final fields.
   Or am I missing something?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10855: MINOR: clean up unneeded `@SuppressWarnings` on Streams module

2021-06-09 Thread GitBox


jlprat commented on pull request #10855:
URL: https://github.com/apache/kafka/pull/10855#issuecomment-858321394


   Failures seem to be:
   > RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   
   And
   
   > SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
   
   
   While doing this PR, I also encountered a similar thing you mention @mjsax , 
somehow IntelliJ needed less `@SuppressWarnings` as `javac`. It seems all the 
ones I removed weren't really needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12925) prefixScan missing from intermediate interfaces

2021-06-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360548#comment-17360548
 ] 

Matthias J. Sax commented on KAFKA-12925:
-

Thanks [~sagarrao]!

Maybe the easiest way is to remove the default implementation and see where it 
fails to compile. After all issues are fixed and it compiles again you can add 
back the default implementation. Me might also want to add more tests?

[~mviamari] Could you share the code that triggered the issue as a starting 
point for writing tests?

> prefixScan missing from intermediate interfaces
> ---
>
> Key: KAFKA-12925
> URL: https://issues.apache.org/jira/browse/KAFKA-12925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Michael Viamari
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.0.0, 2.8.1
>
>
> [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] 
> introduced support for {{prefixScan}} to StateStores.
> It seems that many of the intermediate {{StateStore}} interfaces are missing 
> a definition for {{prefixScan}}, and as such is not accessible in all cases.
> For example, when accessing the state stores through a the processor context, 
> the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not 
> define {{prefixScan}} and it falls back to the default implementation in 
> {{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] Clarencezero closed pull request #10859: Controller源码分析,对Contorller核心组件做了详细的注解

2021-06-09 Thread GitBox


Clarencezero closed pull request #10859:
URL: https://github.com/apache/kafka/pull/10859


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Clarencezero opened a new pull request #10859: Controller源码分析,对Contorller核心组件做了详细的注解

2021-06-09 Thread GitBox


Clarencezero opened a new pull request #10859:
URL: https://github.com/apache/kafka/pull/10859


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-09 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r648811500



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -254,8 +255,14 @@ public synchronized int sendFetches() {
 for (Map.Entry entry : 
fetchRequestMap.entrySet()) {
 final Node fetchTarget = entry.getKey();
 final FetchSessionHandler.FetchRequestData data = entry.getValue();
+final short maxVersion;
+if (!data.canUseTopicIds()) {
+maxVersion = (short) 12;
+} else {
+maxVersion = ApiKeys.FETCH.latestVersion();
+}
 final FetchRequest.Builder request = FetchRequest.Builder
-.forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
+.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
data.toSend(), metadata.topicIds())

Review comment:
   I noticed that I get topic IDs from metadata here and in the replica 
fetcher thread, I get from the metadata cache. I don't think it is a big deal 
since we add to the fetchData using the same source, but it might make sense to 
use FetchRequestData's topicIds() instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12925) prefixScan missing from intermediate interfaces

2021-06-09 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360497#comment-17360497
 ] 

Sagar Rao edited comment on KAFKA-12925 at 6/10/21, 2:31 AM:
-

Sure [~ableegoldman], [~mjsax] I have assigned it to myself. Will take it up.

TBH I did struggle to figure out where all should this method be implemented 
and having a default implementation masked the misses. What I will do is, find 
all places where a method like range() has been implemented and try to add it 
there if I can. Would that work?


was (Author: sagarrao):
Sure [~ableegoldman], [~mjsax] I have assigned it to myself. Will take it up..

> prefixScan missing from intermediate interfaces
> ---
>
> Key: KAFKA-12925
> URL: https://issues.apache.org/jira/browse/KAFKA-12925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Michael Viamari
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.0.0, 2.8.1
>
>
> [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] 
> introduced support for {{prefixScan}} to StateStores.
> It seems that many of the intermediate {{StateStore}} interfaces are missing 
> a definition for {{prefixScan}}, and as such is not accessible in all cases.
> For example, when accessing the state stores through a the processor context, 
> the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not 
> define {{prefixScan}} and it falls back to the default implementation in 
> {{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12925) prefixScan missing from intermediate interfaces

2021-06-09 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360497#comment-17360497
 ] 

Sagar Rao edited comment on KAFKA-12925 at 6/10/21, 2:21 AM:
-

Sure [~ableegoldman], [~mjsax] I have assigned it to myself. Will take it up..


was (Author: sagarrao):
Sure [~ableegoldman], I have assigned it to myself. Will take it up and try to 
come up with a fix.

> prefixScan missing from intermediate interfaces
> ---
>
> Key: KAFKA-12925
> URL: https://issues.apache.org/jira/browse/KAFKA-12925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Michael Viamari
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.0.0, 2.8.1
>
>
> [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] 
> introduced support for {{prefixScan}} to StateStores.
> It seems that many of the intermediate {{StateStore}} interfaces are missing 
> a definition for {{prefixScan}}, and as such is not accessible in all cases.
> For example, when accessing the state stores through a the processor context, 
> the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not 
> define {{prefixScan}} and it falls back to the default implementation in 
> {{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12925) prefixScan missing from intermediate interfaces

2021-06-09 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360497#comment-17360497
 ] 

Sagar Rao commented on KAFKA-12925:
---

Sure [~ableegoldman], I have assigned it to myself. Will take it up and try to 
come up with a fix.

> prefixScan missing from intermediate interfaces
> ---
>
> Key: KAFKA-12925
> URL: https://issues.apache.org/jira/browse/KAFKA-12925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Michael Viamari
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.0.0, 2.8.1
>
>
> [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] 
> introduced support for {{prefixScan}} to StateStores.
> It seems that many of the intermediate {{StateStore}} interfaces are missing 
> a definition for {{prefixScan}}, and as such is not accessible in all cases.
> For example, when accessing the state stores through a the processor context, 
> the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not 
> define {{prefixScan}} and it falls back to the default implementation in 
> {{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12925) prefixScan missing from intermediate interfaces

2021-06-09 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-12925:
-

Assignee: Sagar Rao

> prefixScan missing from intermediate interfaces
> ---
>
> Key: KAFKA-12925
> URL: https://issues.apache.org/jira/browse/KAFKA-12925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Michael Viamari
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.0.0, 2.8.1
>
>
> [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] 
> introduced support for {{prefixScan}} to StateStores.
> It seems that many of the intermediate {{StateStore}} interfaces are missing 
> a definition for {{prefixScan}}, and as such is not accessible in all cases.
> For example, when accessing the state stores through a the processor context, 
> the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not 
> define {{prefixScan}} and it falls back to the default implementation in 
> {{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9108) Flaky Test LogCleanerIntegrationTest#testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics

2021-06-09 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360487#comment-17360487
 ] 

dengziming commented on KAFKA-9108:
---

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10852/1/testReport/junit/kafka.log/LogCleanerIntegrationTest/Build___JDK_11_and_Scala_2_13___testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics__/

> Flaky Test 
> LogCleanerIntegrationTest#testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics
> ---
>
> Key: KAFKA-9108
> URL: https://issues.apache.org/jira/browse/KAFKA-9108
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/26219/testReport/junit/kafka.log/LogCleanerIntegrationTest/testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics/]
> {quote}org.scalatest.exceptions.TestFailedException: There should be 2 
> uncleanable partitions at 
> org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:530)
>  at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions$class.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842) at 
> kafka.log.LogCleanerIntegrationTest.testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics(LogCleanerIntegrationTest.scala:80){quote}
> STDOUT
> {quote}[2019-10-29 01:05:08,296] ERROR [kafka-log-cleaner-thread-0]: Error 
> due to (kafka.log.LogCleaner:76) java.lang.InterruptedException at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) at 
> kafka.utils.ShutdownableThread.pause(ShutdownableThread.scala:82) at 
> kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:315) at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12927) Kafka account no authentication failure anti-theft cracking mechanism

2021-06-09 Thread SingleThread (Jira)
SingleThread created KAFKA-12927:


 Summary: Kafka account no authentication failure anti-theft 
cracking mechanism
 Key: KAFKA-12927
 URL: https://issues.apache.org/jira/browse/KAFKA-12927
 Project: Kafka
  Issue Type: Improvement
Reporter: SingleThread
 Attachments: image-2021-06-10-09-43-32-739.png

After the Kafka account no authentication fails, there is no policy similar to 
account lockout, so there is a security problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12920) Consumer's cooperative sticky assignor need to clear generation / assignment data upon `onPartitionsLost`

2021-06-09 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12920.

Resolution: Not A Bug

> Consumer's cooperative sticky assignor need to clear generation / assignment 
> data upon `onPartitionsLost`
> -
>
> Key: KAFKA-12920
> URL: https://issues.apache.org/jira/browse/KAFKA-12920
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: bug, consumer
>
> Consumer's cooperative-sticky assignor does not track the owned partitions 
> inside the assignor --- i.e. when it reset its state in event of 
> ``onPartitionsLost``, the ``memberAssignment`` and ``generation`` inside the 
> assignor would not be cleared. This would cause a member to join with empty 
> generation on the protocol while with non-empty user-data encoding the old 
> assignment still (and hence pass the validation check on broker side during 
> JoinGroup), and eventually cause a single partition to be assigned to 
> multiple consumers within a generation.
> We should let the assignor to also clear its assignment/generation when 
> ``onPartitionsLost`` is triggered in order to avoid this scenario.
> Note that 1) for the regular sticky assignor the generation would still have 
> an older value, and this would cause the previously owned partitions to be 
> discarded during the assignment, and 2) for Streams' sticky assignor, it’s 
> encoding would indeed be cleared along with ``onPartitionsLost``. Hence only 
> Consumer's cooperative-sticky assignor have this issue to solve.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12920) Consumer's cooperative sticky assignor need to clear generation / assignment data upon `onPartitionsLost`

2021-06-09 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360480#comment-17360480
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12920:


I believe we've found the real issue, and are just discussing what the correct 
behavior here should be. I'm going to close this ticket as 'Not a Bug' and file 
a separate issue with the problems we've found in the 
Consumer/AbstractCoordinator that have resulted in 
[this|https://issues.apache.org/jira/browse/KAFKA-12896] odd behavior we 
observed with the cooperative-sticky assignor

> Consumer's cooperative sticky assignor need to clear generation / assignment 
> data upon `onPartitionsLost`
> -
>
> Key: KAFKA-12920
> URL: https://issues.apache.org/jira/browse/KAFKA-12920
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: bug, consumer
>
> Consumer's cooperative-sticky assignor does not track the owned partitions 
> inside the assignor --- i.e. when it reset its state in event of 
> ``onPartitionsLost``, the ``memberAssignment`` and ``generation`` inside the 
> assignor would not be cleared. This would cause a member to join with empty 
> generation on the protocol while with non-empty user-data encoding the old 
> assignment still (and hence pass the validation check on broker side during 
> JoinGroup), and eventually cause a single partition to be assigned to 
> multiple consumers within a generation.
> We should let the assignor to also clear its assignment/generation when 
> ``onPartitionsLost`` is triggered in order to avoid this scenario.
> Note that 1) for the regular sticky assignor the generation would still have 
> an older value, and this would cause the previously owned partitions to be 
> discarded during the assignment, and 2) for Streams' sticky assignor, it’s 
> encoding would indeed be cleared along with ``onPartitionsLost``. Hence only 
> Consumer's cooperative-sticky assignor have this issue to solve.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] IgnacioAcunaF opened a new pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

2021-06-09 Thread GitBox


IgnacioAcunaF opened a new pull request #10858:
URL: https://github.com/apache/kafka/pull/10858


   **Jira:**: https://issues.apache.org/jira/browse/KAFKA-12926
   
   Instead of setting "null" to negative offsets partition (as in KAFKA-9507), 
this PR aims to skip those cases from the returned list, because setting them 
in "null" can cause java.lang.NullPointerExceptions on downstreams methods that 
tries to access the data on them, because they are expecting an 
_OffsetAndMetadata_ and they encouter null values.
   
   For example, at ConsumerGroupCommand.scala at core:
   ```
 val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
   .map { topicPartition =>
 topicPartition -> 
committedOffsets.get(topicPartition).map(_.offset)
   }.toMap
   ```
   If topicPartition has an negative offset, the 
committedOffsets.get(topicPartition) is null (as defined on KAFKA-9507), which 
translates into null.map(_.offset), which will lead to: _Error: Executing 
consumer group command failed due to null
   java.lang.NullPointerException_
   
   Unit tests added to assert that topics's partitions with an INVALID_OFFSET 
are not considered on the returned list of the consmer groups's offsets, so the 
downstream methods receive only valid _OffsetAndMetadata_ information.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12925) prefixScan missing from intermediate interfaces

2021-06-09 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360472#comment-17360472
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-12925 at 6/10/21, 12:51 AM:
---

[~sagarrao] do you think you'll be able to get in a fix for this in the next 
few weeks? It would be good to have this working in full capacity by 3.0

(I wouldn't consider it a blocker for 3.0, but we have enough time before code 
freeze that this should not be an issue, provided someone can pick it up)


was (Author: ableegoldman):
[~sagarrao] do you think you'll be able to get in a fix for this in the next 
few weeks? It would be good to have this working in full capacity by 3.0

> prefixScan missing from intermediate interfaces
> ---
>
> Key: KAFKA-12925
> URL: https://issues.apache.org/jira/browse/KAFKA-12925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Michael Viamari
>Priority: Major
> Fix For: 3.0.0, 2.8.1
>
>
> [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] 
> introduced support for {{prefixScan}} to StateStores.
> It seems that many of the intermediate {{StateStore}} interfaces are missing 
> a definition for {{prefixScan}}, and as such is not accessible in all cases.
> For example, when accessing the state stores through a the processor context, 
> the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not 
> define {{prefixScan}} and it falls back to the default implementation in 
> {{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12925) prefixScan missing from intermediate interfaces

2021-06-09 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360472#comment-17360472
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12925:


[~sagarrao] do you think you'll be able to get in a fix for this in the next 
few weeks? It would be good to have this working in full capacity by 3.0

> prefixScan missing from intermediate interfaces
> ---
>
> Key: KAFKA-12925
> URL: https://issues.apache.org/jira/browse/KAFKA-12925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Michael Viamari
>Priority: Major
>
> [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] 
> introduced support for {{prefixScan}} to StateStores.
> It seems that many of the intermediate {{StateStore}} interfaces are missing 
> a definition for {{prefixScan}}, and as such is not accessible in all cases.
> For example, when accessing the state stores through a the processor context, 
> the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not 
> define {{prefixScan}} and it falls back to the default implementation in 
> {{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12925) prefixScan missing from intermediate interfaces

2021-06-09 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12925:
---
Fix Version/s: 2.8.1
   3.0.0

> prefixScan missing from intermediate interfaces
> ---
>
> Key: KAFKA-12925
> URL: https://issues.apache.org/jira/browse/KAFKA-12925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Michael Viamari
>Priority: Major
> Fix For: 3.0.0, 2.8.1
>
>
> [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] 
> introduced support for {{prefixScan}} to StateStores.
> It seems that many of the intermediate {{StateStore}} interfaces are missing 
> a definition for {{prefixScan}}, and as such is not accessible in all cases.
> For example, when accessing the state stores through a the processor context, 
> the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not 
> define {{prefixScan}} and it falls back to the default implementation in 
> {{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on a change in pull request #10848: MINOR Updated transaction index as optional in LogSegmentData.

2021-06-09 Thread GitBox


satishd commented on a change in pull request #10848:
URL: https://github.com/apache/kafka/pull/10848#discussion_r648768897



##
File path: 
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/LogSegmentData.java
##
@@ -33,31 +34,32 @@
 private final Path logSegment;
 private final Path offsetIndex;
 private final Path timeIndex;
-private final Path txnIndex;
+private final Optional transactionIndex;
 private final Path producerSnapshotIndex;
 private final ByteBuffer leaderEpochIndex;
 
 /**
  * Creates a LogSegmentData instance with data and indexes.
- *  @param logSegmentactual log segment file
+ *
+ * @param logSegmentactual log segment file
  * @param offsetIndex   offset index file
  * @param timeIndex time index file
- * @param txnIndex  transaction index file
+ * @param transactionIndex  transaction index file, which can be null
  * @param producerSnapshotIndex producer snapshot until this segment
  * @param leaderEpochIndex  leader-epoch-index until this segment
  */
 public LogSegmentData(Path logSegment,
   Path offsetIndex,
   Path timeIndex,
-  Path txnIndex,
+  Path transactionIndex,

Review comment:
   Passing `Optional` as arguments is not considered as a good practice. We 
still need to do null check for that Optional instance. 
   [SO answer from Brian 
Goetz](https://stackoverflow.com/questions/26327957/should-java-8-getters-return-optional-type/26328555#26328555)
 mentioned the right usage of `Optional`. I have also update PR not to use it 
as a field.
   
   ```
   You should almost never use it as a field of something or a method parameter.
   ```
   
   [Javadoc of Optional 
](https://docs.oracle.com/javase/10/docs/api/java/util/Optional.html)suggests 
returning as an argument mentioned below.
   ```
   API Note:
   Optional is primarily intended for use as a method return type where there 
is a clear need to represent "no result," and where using null is likely to 
cause errors. A variable whose type is Optional should never itself be null; it 
should always point to an Optional instance.
   ```
   
   I do not have strong opinions on this, I am fine with the conventions that 
we are following in this project if we have any on `Optional` usage. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12926) ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

2021-06-09 Thread Ignacio Acuna (Jira)
Ignacio Acuna created KAFKA-12926:
-

 Summary: ConsumerGroupCommand's java.lang.NullPointerException at 
negative offsets while running kafka-consumer-groups.sh
 Key: KAFKA-12926
 URL: https://issues.apache.org/jira/browse/KAFKA-12926
 Project: Kafka
  Issue Type: Bug
  Components: admin, clients
Reporter: Ignacio Acuna
Assignee: Ignacio Acuna


Hi everybody, hope everyone is doing great.

*i) Introduction:*
I noticed the following exception (on a cluster with brokers running 2.3.1) 
when trying to describe a consumer group (using the Kafka 2.7.1):

 
{code:java}
./kafka-consumer-groups.sh --describe --group order-validations{code}
{code:java}
Error: Executing consumer group command failed due to null
java.lang.NullPointerException
 at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$6(ConsumerGroupCommand.scala:579)
 at 
scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99)
 at 
scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86)
 at 
scala.collection.convert.JavaCollectionWrappers$JSetWrapper.map(JavaCollectionWrappers.scala:180)
 at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$5(ConsumerGroupCommand.scala:578)
 at scala.collection.immutable.List.flatMap(List.scala:293)
 at scala.collection.immutable.List.flatMap(List.scala:79)
 at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$2(ConsumerGroupCommand.scala:574)
 at scala.collection.Iterator$$anon$9.next(Iterator.scala:575)
 at scala.collection.mutable.Growable.addAll(Growable.scala:62)
 at scala.collection.mutable.Growable.addAll$(Growable.scala:59)
 at scala.collection.mutable.HashMap.addAll(HashMap.scala:117)
 at scala.collection.mutable.HashMap$.from(HashMap.scala:570)
 at scala.collection.mutable.HashMap$.from(HashMap.scala:563)
 at scala.collection.MapOps$WithFilter.map(Map.scala:358)
 at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:569)
 at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:369)
 at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:76)
 at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63)
 at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala){code}
 

When trying on and older version of AdminClient (2.3.1):
{code:java}
Error: Executing consumer group command failed due to 
java.lang.IllegalArgumentException: Invalid negative offset
java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: 
Invalid negative offset
 at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
 at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
 at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
 at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
 at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.getCommittedOffsets(ConsumerGroupCommand.scala:595)
 at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$2(ConsumerGroupCommand.scala:421)
 at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$Lambda$131/4CB1EFD0.apply(Unknown
 Source)
 at 
scala.collection.TraversableLike$WithFilter.$anonfun$map$2(TraversableLike.scala:827)
 at 
scala.collection.TraversableLike$WithFilter$$Lambda$132/4CD49E20.apply(Unknown
 Source)
 at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
 at scala.collection.mutable.HashMap$$Lambda$133/4CD4A4F0.apply(Unknown 
Source)
 at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
 at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
 at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:826)
 at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:419)
 at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:312)
 at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63)
 at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: java.lang.IllegalArgumentException: Invalid negative offset
 at 
org.apache.kafka.clients.consumer.OffsetAndMetadata.(OffsetAndMetadata.java:50)
 at 
org.apache.kafka.clients.admin.KafkaAdminClient$24$1.handleResponse(KafkaAdminClient.java:2832)
 at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1032)
 at 

[GitHub] [kafka] cmccabe opened a new pull request #10857: MINOR: Create SnapshotWriter and SnapshotReader interfaces

2021-06-09 Thread GitBox


cmccabe opened a new pull request #10857:
URL: https://github.com/apache/kafka/pull/10857


   Previously, we had an interface named
   org.apache.kafka.controller.SnapshotWriter and a concrete implementation
   class called org.apache.kafka.snapshot.SnapshotWriter.
   
   As part of unit tests, it is often helpful to be able to use an
   interface to mock snapshot writing and reading. Therefore, we should
   make the interface class part of the Raft package, not the controller
   package. This PR moves SnapshotWriter into that package and renames the
   concrete class to RaftSnapshotWriter.
   
   This PR also harmonizes the two classes. For example, "completeSnapshot"
   becomes "freeze" (since that's what it was in the raft package).
   Similarly, "writeBatch" becomes "append".  SnapshotWriter is now
   templated on the record type, in order to be generic, rather than
   specific to metadata.
   
   The controller code sometimes refers to the snapshot's end offset as its
   "epoch". This is confusing since there is also the concept of a raft log
   epoch. I have tried to remove most of these uses and replace them with
   "endOffset", which is more descriptive.
   
   I also fixed an off-by-one error where we advanced lastCommittedOffset
   to be just before a snapshot's endOffset, rather than at a snapshot end
   offset.
   
   Finally, I removed snapshotId from the reader and writer interface.
   Since snapshots are only ever taken (or indeed read) from committed
   offsets, there is no need for the controller or broker to supply this
   information to the raft layer. The end offset is sufficient. The raft
   layer can look up the relevant information if there is any need for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10501) Log Cleaner never clean up some __consumer_offsets partitions

2021-06-09 Thread Victor Garcia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360469#comment-17360469
 ] 

Victor Garcia commented on KAFKA-10501:
---

[~mbaluta] do you think this issue is the same as explained in this comment? 
https://issues.apache.org/jira/browse/KAFKA-8335?focusedCommentId=16870892=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16870892
 

 

I wonder how you got the content of the example, is this just the dump of a log 
file or you were just consuming from the topic?

When i dump the log file i don't see those ABORT and COMMIT messages

> Log Cleaner never clean up some __consumer_offsets partitions
> -
>
> Key: KAFKA-10501
> URL: https://issues.apache.org/jira/browse/KAFKA-10501
> Project: Kafka
>  Issue Type: Bug
>  Components: log, log cleaner
>Affects Versions: 2.5.0
>Reporter: Mykhailo Baluta
>Priority: Major
>
> Some __consumer_offsets partitions contain "broken" messages in the second 
> log segment.
> Example: 
> {code:java}
> offset: 745253728 position: 49793647 CreateTime: 1594539245536 isvalid: true 
> keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 
> producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
> offset: 745253729 position: 49793844 CreateTime: 1594539245548 isvalid: true 
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 
> producerEpoch: 0 sequence: -1 isTransactional: true headerKeys: [] 
> endTxnMarker: COMMIT coordinatorEpoch: 59
> offset: 745256523 position: 50070884 CreateTime: 1594540927673 isvalid: true 
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 
> producerEpoch: 1 sequence: -1 isTransactional: true headerKeys: [] 
> endTxnMarker: ABORT coordinatorEpoch: 59
> offset: 745256543 position: 50073185 CreateTime: 1594541667798 isvalid: true 
> keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 
> producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
> {code}
>  Seems like the last 2 records are stored in the wrong order. As a result the 
> last message is transactional and not any ABORT/COMMIT message after. It 
> leads to a producer state with ongoing transactions and 
> firstUncleanableDirtyOffset = 745256543. Thus, compaction always skips for 
> such topic partitions.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-4327) Move Reset Tool from core to streams

2021-06-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4327:
---
Description: 
This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008

Currently, Kafka Streams Application Reset Tool is part of {{core}} module due 
to ZK dependency. After KIP-4 got merged, this dependency can be dropped and 
the Reset Tool can be moved to {{streams}} module.

This should also update {{InternalTopicManager#filterExistingTopics}} that 
revers to ResetTool in an exception message:
 {{"Use 'kafka.tools.StreamsResetter' tool"}}
 -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}

Doing this JIRA also requires to update the docs with regard to broker backward 
compatibility – not all broker support "topic delete request" and thus, the 
reset tool will not be backward compatible to all broker versions.

KIP-756: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-756%3A+Move+StreamsResetter+tool+outside+of+core]
 

  was:
This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008

Currently, Kafka Streams Application Reset Tool is part of {{core}} module due 
to ZK dependency. After KIP-4 got merged, this dependency can be dropped and 
the Reset Tool can be moved to {{streams}} module.

This should also update {{InternalTopicManager#filterExistingTopics}} that 
revers to ResetTool in an exception message:
{{"Use 'kafka.tools.StreamsResetter' tool"}}
-> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}

Doing this JIRA also requires to update the docs with regard to broker backward 
compatibility -- not all broker support "topic delete request" and thus, the 
reset tool will not be backward compatible to all broker versions.


> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Blocker
>  Labels: kip
> Fix For: 3.0.0
>
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
>  {{"Use 'kafka.tools.StreamsResetter' tool"}}
>  -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility – not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.
> KIP-756: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-756%3A+Move+StreamsResetter+tool+outside+of+core]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2021-06-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360468#comment-17360468
 ] 

Matthias J. Sax commented on KAFKA-4327:


As 3.0 KIP deadline passed today, I think we need to push this out to 4.0? – On 
the other hand, we could follow through with the KIP and just add the new class 
as proposed, and just keep the old one and only remove the old one in 4.0?

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-4327) Move Reset Tool from core to streams

2021-06-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4327:
---
Labels: kip  (was: needs-kip)

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Blocker
>  Labels: kip
> Fix For: 3.0.0
>
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12925) prefixScan missing from intermediate interfaces

2021-06-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360467#comment-17360467
 ] 

Matthias J. Sax commented on KAFKA-12925:
-

[~sagarrao] [~cadonna] [~guozhang] – I had a quick look into this, and it seems 
`prefixScan` is fundamentally "broken" and unusable?

We have a lot of internal "wrapper" classes that implement the base interface 
and none of them implements the new method to forward the corresponding 
calls... Seems having a default implementation hid this issue on the original 
PR...

> prefixScan missing from intermediate interfaces
> ---
>
> Key: KAFKA-12925
> URL: https://issues.apache.org/jira/browse/KAFKA-12925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Michael Viamari
>Priority: Major
>
> [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] 
> introduced support for {{prefixScan}} to StateStores.
> It seems that many of the intermediate {{StateStore}} interfaces are missing 
> a definition for {{prefixScan}}, and as such is not accessible in all cases.
> For example, when accessing the state stores through a the processor context, 
> the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not 
> define {{prefixScan}} and it falls back to the default implementation in 
> {{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9009) Flaky Test kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete

2021-06-09 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360466#comment-17360466
 ] 

Luke Chen commented on KAFKA-9009:
--

[~mjsax], yes, PR is ready and waiting for the test author [~mimaison] 's 
review. Thanks.

> Flaky Test 
> kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete
> --
>
> Key: KAFKA-9009
> URL: https://issues.apache.org/jira/browse/KAFKA-9009
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Affects Versions: 2.5.0, 2.6.0
>Reporter: Bill Bejeck
>Assignee: Luke Chen
>Priority: Major
>  Labels: flaky-test
>
> Failure seen in 
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/25644/testReport/junit/kafka.integration/MetricsDuringTopicCreationDeletionTest/testMetricsDuringTopicCreateDelete/]
>  
> {noformat}
> Error Messagejava.lang.AssertionError: assertion failed: 
> UnderReplicatedPartitionCount not 0: 1Stacktracejava.lang.AssertionError: 
> assertion failed: UnderReplicatedPartitionCount not 0: 1
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete(MetricsDuringTopicCreationDeletionTest.scala:123)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>   at 

[GitHub] [kafka] dhruvilshah3 closed pull request #10388: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-09 Thread GitBox


dhruvilshah3 closed pull request #10388:
URL: https://github.com/apache/kafka/pull/10388


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dhruvilshah3 commented on pull request #10388: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-09 Thread GitBox


dhruvilshah3 commented on pull request #10388:
URL: https://github.com/apache/kafka/pull/10388#issuecomment-858171003


   Closing this PR as it's being taken forward in 
https://github.com/apache/kafka/pull/10763.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12925) prefixScan missing from intermediate interfaces

2021-06-09 Thread Michael Viamari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Viamari updated KAFKA-12925:

Summary: prefixScan missing from intermediate interfaces  (was: 
StateStore::prefixScan missing from intermediate interfaces)

> prefixScan missing from intermediate interfaces
> ---
>
> Key: KAFKA-12925
> URL: https://issues.apache.org/jira/browse/KAFKA-12925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Michael Viamari
>Priority: Major
>
> [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] 
> introduced support for {{prefixScan}} to StateStores.
> It seems that many of the intermediate {{StateStore}} interfaces are missing 
> a definition for {{prefixScan}}, and as such is not accessible in all cases.
> For example, when accessing the state stores through a the processor context, 
> the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not 
> define {{prefixScan}} and it falls back to the default implementation in 
> {{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12925) StateStore::prefixScan missing from intermediate interfaces

2021-06-09 Thread Michael Viamari (Jira)
Michael Viamari created KAFKA-12925:
---

 Summary: StateStore::prefixScan missing from intermediate 
interfaces
 Key: KAFKA-12925
 URL: https://issues.apache.org/jira/browse/KAFKA-12925
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.8.0
Reporter: Michael Viamari


[KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
 and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] introduced 
support for {{prefixScan}} to StateStores.

It seems that many of the intermediate {{StateStore}} interfaces are missing a 
definition for {{prefixScan}}, and as such is not accessible in all cases.

For example, when accessing the state stores through a the processor context, 
the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not define 
{{prefixScan}} and it falls back to the default implementation in 
{{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12657) Flaky Tests BlockingConnectorTest.testWorkerRestartWithBlockInConnectorStop

2021-06-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360461#comment-17360461
 ] 

Matthias J. Sax commented on KAFKA-12657:
-

https://github.com/apache/kafka/pull/10810/checks?check_run_id=2780356245

> Flaky Tests BlockingConnectorTest.testWorkerRestartWithBlockInConnectorStop
> ---
>
> Key: KAFKA-12657
> URL: https://issues.apache.org/jira/browse/KAFKA-12657
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://github.com/apache/kafka/pull/10506/checks?check_run_id=2327377745]
> {quote} {{org.opentest4j.AssertionFailedError: Condition not met within 
> timeout 6. Worker did not complete startup in time ==> expected:  
> but was: 
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
>   at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
>   at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:319)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:367)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:316)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.integration.BlockingConnectorTest.setup(BlockingConnectorTest.java:133)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9009) Flaky Test kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete

2021-06-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360462#comment-17360462
 ] 

Matthias J. Sax commented on KAFKA-9009:


Any progress? Failed gain: 
https://github.com/apache/kafka/pull/10810/checks?check_run_id=2780397572

> Flaky Test 
> kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete
> --
>
> Key: KAFKA-9009
> URL: https://issues.apache.org/jira/browse/KAFKA-9009
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Affects Versions: 2.5.0, 2.6.0
>Reporter: Bill Bejeck
>Assignee: Luke Chen
>Priority: Major
>  Labels: flaky-test
>
> Failure seen in 
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/25644/testReport/junit/kafka.integration/MetricsDuringTopicCreationDeletionTest/testMetricsDuringTopicCreateDelete/]
>  
> {noformat}
> Error Messagejava.lang.AssertionError: assertion failed: 
> UnderReplicatedPartitionCount not 0: 1Stacktracejava.lang.AssertionError: 
> assertion failed: UnderReplicatedPartitionCount not 0: 1
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete(MetricsDuringTopicCreationDeletionTest.scala:123)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>   at 

[GitHub] [kafka] mjsax commented on pull request #10846: KAFKA-12914: StreamSourceNode should return `null` topic name for pattern subscription

2021-06-09 Thread GitBox


mjsax commented on pull request #10846:
URL: https://github.com/apache/kafka/pull/10846#issuecomment-858160278


   What do you propose @cadonna ? Also happy to do a follow up PR (but not sure 
if necessary)? It's all just internal code so we can change at will. Happy to 
merge as-is, or change to `Optional` in this or a follow up PR...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10856: MINOR: Small optimizations and removal of unused code in Streams

2021-06-09 Thread GitBox


mjsax commented on a change in pull request #10856:
URL: https://github.com/apache/kafka/pull/10856#discussion_r648739553



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -294,14 +293,6 @@ public final void removeAllThreadLevelSensors(final String 
threadId) {
 return tagMap;
 }
 
-public Map bufferLevelTagMap(final String threadId,

Review comment:
   \cc @cadonna Any though on this? (Similar below)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java
##
@@ -25,8 +25,8 @@
  */
 public class StoreQueryParameters {
 
-private Integer partition;
-private boolean staleStores;
+private final Integer partition;
+private final boolean staleStores;

Review comment:
   Wondering who this is possible? Should the build not fail for this case? 
\cc @vvcephei 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException

2021-06-09 Thread GitBox


hachikuji commented on a change in pull request #10749:
URL: https://github.com/apache/kafka/pull/10749#discussion_r648724939



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -68,15 +68,22 @@ public static Path snapshotPath(Path logDir, OffsetAndEpoch 
snapshotId) {
 return snapshotDir(logDir).resolve(filenameFromSnapshotId(snapshotId) 
+ SUFFIX);
 }
 
-public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) 
throws IOException {
+public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) {
 Path dir = snapshotDir(logDir);
+Path tempFile;
 
-// Create the snapshot directory if it doesn't exists
-Files.createDirectories(dir);
-
-String prefix = String.format("%s-", 
filenameFromSnapshotId(snapshotId));
+try {
+// Create the snapshot directory if it doesn't exists
+Files.createDirectories(dir);
 
-return Files.createTempFile(dir, prefix, PARTIAL_SUFFIX);
+String prefix = String.format("%s-", 
filenameFromSnapshotId(snapshotId));
+tempFile = Files.createTempFile(dir, prefix, PARTIAL_SUFFIX);

Review comment:
   nit: couldn't we return here and remove `tempFile`?

##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -78,7 +85,11 @@ public void append(MemoryRecords records) {
 checkIfFrozen("Append");
 Utils.writeFully(channel, records.buffer());
 } catch (IOException e) {
-throw new RuntimeException(e);
+throw new UncheckedIOException(
+String.format("Error writing file snapshot," +

Review comment:
   nit: missing a space after the comma. Also, it would be nice to be 
consistent with the use of comma vs period for the similar cases in this file




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10588: KAFKA-12662: add unit test for ProducerPerformance

2021-06-09 Thread GitBox


chia7712 commented on a change in pull request #10588:
URL: https://github.com/apache/kafka/pull/10588#discussion_r648710542



##
File path: 
tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
##
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Callback;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+public class ProducerPerformanceTest {
+
+@Mock
+KafkaProducer producerMock;
+
+@Spy
+ProducerPerformance producerPerformanceSpy;
+
+private File createTempFile(String contents) throws IOException {
+File file = File.createTempFile("ProducerPerformanceTest", ".tmp");
+file.deleteOnExit();
+final FileWriter writer = new FileWriter(file);
+writer.write(contents);
+writer.close();
+return file;
+}
+
+@Test
+public void testReadPayloadFile() throws Exception {
+File payloadFile = createTempFile("Hello\nKafka");
+String payloadFilePath = payloadFile.getAbsolutePath();
+String payloadDelimiter = "\n";
+
+List payloadByteList = 
ProducerPerformance.readPayloadFile(payloadFilePath, payloadDelimiter);
+
+assertEquals(2, payloadByteList.size());
+assertEquals("Hello", new String(payloadByteList.get(0)));
+assertEquals("Kafka", new String(payloadByteList.get(1)));
+}
+
+@Test
+public void testReadProps() throws Exception {
+
+List producerProps = 
Collections.singletonList("bootstrap.servers=localhost:9000");
+String producerConfig = createTempFile("acks=1").getAbsolutePath();
+String transactionalId = "1234";
+boolean transactionsEnabled = true;
+
+Properties prop = ProducerPerformance.readProps(producerProps, 
producerConfig, transactionalId, transactionsEnabled);
+
+assertNotNull(prop);
+assertEquals(5, prop.size());
+}
+
+@Test
+public void testNumberOfCallsForSendAndClose() throws IOException {
+
+doReturn(null).when(producerMock).send(any(), 
ArgumentMatchers.any());
+
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
+
+String[] args = new String[] {"--topic", "Hello-Kafka", 
"--num-records", "5", "--throughput", "100", "--record-size", "100", 
"--producer-props", "bootstrap.servers=localhost:9000"};
+producerPerformanceSpy.start(args);
+verify(producerMock, times(5)).send(any(), 
ArgumentMatchers.any());

Review comment:
   `< Callback >` this explicit type is not necessary. 

##
File path: 
tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
##
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the 

[jira] [Resolved] (KAFKA-12874) Increase default consumer session timeout to 45s (KIP-735)

2021-06-09 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12874.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

> Increase default consumer session timeout to 45s (KIP-735)
> --
>
> Key: KAFKA-12874
> URL: https://issues.apache.org/jira/browse/KAFKA-12874
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.0.0
>
>
> As documented in KIP-735, we will increase the default session timeout to 
> 45s: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #10803: KAFKA-12874; Increase default consumer session timeout to 45s

2021-06-09 Thread GitBox


hachikuji merged pull request #10803:
URL: https://github.com/apache/kafka/pull/10803


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10803: KAFKA-12874; Increase default consumer session timeout to 45s

2021-06-09 Thread GitBox


hachikuji commented on pull request #10803:
URL: https://github.com/apache/kafka/pull/10803#issuecomment-858133566


   I'm going to go ahead and merge this. I have been having trouble getting a 
good build, but that does not seem unique to the PR. The test failures in the 
recent build appear to be known flaky tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-09 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r648132657



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -246,17 +262,17 @@ object LogLoader extends Logging {
 return fn
   } catch {
 case e: LogSegmentOffsetOverflowException =>
-  info(s"${params.logIdentifier}Caught segment overflow error: 
${e.getMessage}. Split segment and retry.")
-  Log.splitOverflowedSegment(
+  info(s"${params.logIdentifier} Caught segment overflow error: 
${e.getMessage}. Split segment and retry.")
+  val result = Log.splitOverflowedSegment(
 e.segment,
 params.segments,
 params.dir,
 params.topicPartition,
 params.config,
 params.scheduler,
 params.logDirFailureChannel,
-params.producerStateManager,
 params.logIdentifier)
+  deleteProducerSnapshotsAsync(result.deletedSegments, params)

Review comment:
   I've created a jira to track this. 
https://issues.apache.org/jira/browse/KAFKA-12923




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao merged pull request #10829: MINOR Removed unused ConfigProvider from raft resources module.

2021-06-09 Thread GitBox


junrao merged pull request #10829:
URL: https://github.com/apache/kafka/pull/10829


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10848: MINOR Updated transaction index as optional in LogSegmentData.

2021-06-09 Thread GitBox


junrao commented on a change in pull request #10848:
URL: https://github.com/apache/kafka/pull/10848#discussion_r648690860



##
File path: 
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/LogSegmentData.java
##
@@ -33,31 +34,32 @@
 private final Path logSegment;
 private final Path offsetIndex;
 private final Path timeIndex;
-private final Path txnIndex;
+private final Optional transactionIndex;
 private final Path producerSnapshotIndex;
 private final ByteBuffer leaderEpochIndex;
 
 /**
  * Creates a LogSegmentData instance with data and indexes.
- *  @param logSegmentactual log segment file
+ *
+ * @param logSegmentactual log segment file
  * @param offsetIndex   offset index file
  * @param timeIndex time index file
- * @param txnIndex  transaction index file
+ * @param transactionIndex  transaction index file, which can be null
  * @param producerSnapshotIndex producer snapshot until this segment
  * @param leaderEpochIndex  leader-epoch-index until this segment
  */
 public LogSegmentData(Path logSegment,
   Path offsetIndex,
   Path timeIndex,
-  Path txnIndex,
+  Path transactionIndex,

Review comment:
   Could we make transactionIndex Optional to make it clear?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-09 Thread GitBox


junrao commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r648679167



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1812,37 +1570,39 @@ class Log(@volatile private var _dir: File,
 endOffset: Long
   ): Unit = {
 logStartOffset = startOffset
-nextOffsetMetadata = LogOffsetMetadata(endOffset, 
activeSegment.baseOffset, activeSegment.size)
-recoveryPoint = math.min(recoveryPoint, endOffset)
+localLog.updateLogEndOffset(endOffset)
 rebuildProducerState(endOffset, producerStateManager)
-updateHighWatermark(math.min(highWatermark, endOffset))
+if (highWatermark < localLog.logEndOffset)

Review comment:
   If highWatermark is smaller, there is no need to update high watermark.

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1788,17 +1554,9 @@ class Log(@volatile private var _dir: File,
 maybeHandleIOException(s"Error while truncating the entire log for 
$topicPartition in dir ${dir.getParent}") {
   debug(s"Truncate and start at offset $newOffset")
   lock synchronized {
-checkIfMemoryMappedBufferClosed()
-removeAndDeleteSegments(logSegments, asyncDelete = true, LogTruncation)
-addSegment(LogSegment.open(dir,
-  baseOffset = newOffset,
-  config = config,
-  time = time,
-  initFileSize = config.initFileSize,
-  preallocate = config.preallocate))
+localLog.truncateFullyAndStartAt(newOffset)

Review comment:
   This is an existing issue. In this case, it seems that we should always 
update high watermark in completeTruncation() with 
localLog.logEndOffsetMetadata.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12920) Consumer's cooperative sticky assignor need to clear generation / assignment data upon `onPartitionsLost`

2021-06-09 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360328#comment-17360328
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12920:


While there does seem to be a possible issue with the cooperative-sticky 
assignor, I don't believe we've found it yet. It's not expected that the 
cooperative-sticky assignor would have cleared the `memberAssignment` and 
`generation` since only the plain sticky assignor uses that stored info. The 
cooperative-sticky assignor gets the partitions for the previous assignment not 
from the `memberAssignment` but instead directly from the SubscriptionState. 
And this _should_ be cleared anytime `onPartitionsLost` is invoked.

However we can keep this ticket open for now to track the investigation into 
the cooperative-sticky assignor. For some context, we've seen a report of 
continuous rebalancing with JoinGroup requests that seem to encode the same 
partition in the previous assignment of two consumers. It's not clear how this 
situation arose, but once we have these initial conditions to the 
cooperative-sticky assignor, it will detect an issue and throw an 
IllegalStateException.

At the very least we should definitely improve the assignor to check for this 
condition and handle it by invalidating those previous assignments, rather than 
just throwing an exception repeatedly. But what's still unclear is how we got 
these conditions to begin with – it doesn't seem possible for the assignor to 
have produced an assignment with double partition ownership, as it would have 
thrown this IllegalStateException. That's what still needs to be investigated, 
and what this report was hoping to have uncovered

> Consumer's cooperative sticky assignor need to clear generation / assignment 
> data upon `onPartitionsLost`
> -
>
> Key: KAFKA-12920
> URL: https://issues.apache.org/jira/browse/KAFKA-12920
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: bug, consumer
>
> Consumer's cooperative-sticky assignor does not track the owned partitions 
> inside the assignor --- i.e. when it reset its state in event of 
> ``onPartitionsLost``, the ``memberAssignment`` and ``generation`` inside the 
> assignor would not be cleared. This would cause a member to join with empty 
> generation on the protocol while with non-empty user-data encoding the old 
> assignment still (and hence pass the validation check on broker side during 
> JoinGroup), and eventually cause a single partition to be assigned to 
> multiple consumers within a generation.
> We should let the assignor to also clear its assignment/generation when 
> ``onPartitionsLost`` is triggered in order to avoid this scenario.
> Note that 1) for the regular sticky assignor the generation would still have 
> an older value, and this would cause the previously owned partitions to be 
> discarded during the assignment, and 2) for Streams' sticky assignor, it’s 
> encoding would indeed be cleared along with ``onPartitionsLost``. Hence only 
> Consumer's cooperative-sticky assignor have this issue to solve.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2021-06-09 Thread Victor Garcia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360281#comment-17360281
 ] 

Victor Garcia commented on KAFKA-8335:
--

[~francisco.juan] we are having the exact same problem as you do. Did you find 
a way to fix this? We upgraded to 2.3 but like in your case, we have a few big 
partitions with tons of "isTransactional: true" messages.

Any help would be really appreciated. Thanks

> Log cleaner skips Transactional mark and batch record, causing unlimited 
> growth of __consumer_offsets
> -
>
> Key: KAFKA-8335
> URL: https://issues.apache.org/jira/browse/KAFKA-8335
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.0.2, 2.1.2, 2.2.1
>
> Attachments: seg_april_25.zip, segment.zip
>
>
> My Colleague Weichu already sent out a mail to kafka user mailing list 
> regarding this issue, but we think it's worth having a ticket tracking it.
> We are using Kafka Streams with exactly-once enabled on a Kafka cluster for
> a while.
> Recently we found that the size of __consumer_offsets partitions grew huge.
> Some partition went over 30G. This caused Kafka to take quite long to load
> "__consumer_offsets" topic on startup (it loads the topic in order to
> become group coordinator).
> We dumped the __consumer_offsets segments and found that while normal
> offset commits are nicely compacted, transaction records (COMMIT, etc) are
> all preserved. Looks like that since these messages don't have a key, the
> LogCleaner is keeping them all:
> --
> $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /003484332061.log --key-decoder-class
> kafka.serializer.StringDecoder 2>/dev/null | cat -v | head
> Dumping 003484332061.log
> Starting offset: 3484332061
> offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006
> producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 81
> offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005
> producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 84
> ...
> --
> Streams is doing transaction commits per 100ms (commit.interval.ms=100 when
> exactly-once) so the __consumer_offsets is growing really fast.
> Is this (to keep all transactions) by design, or is that a bug for
> LogCleaner?  What would be the way to clean up the topic?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2021-06-09 Thread Victor Garcia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360281#comment-17360281
 ] 

Victor Garcia edited comment on KAFKA-8335 at 6/9/21, 6:47 PM:
---

[~francisco.juan] we are having the exact same problem as you had. Did you find 
a way to fix this? We upgraded to 2.3 but like in your case, we have a few big 
partitions with tons of "isTransactional: true" messages.

Any help would be really appreciated. Thanks


was (Author: victorgp):
[~francisco.juan] we are having the exact same problem as you do. Did you find 
a way to fix this? We upgraded to 2.3 but like in your case, we have a few big 
partitions with tons of "isTransactional: true" messages.

Any help would be really appreciated. Thanks

> Log cleaner skips Transactional mark and batch record, causing unlimited 
> growth of __consumer_offsets
> -
>
> Key: KAFKA-8335
> URL: https://issues.apache.org/jira/browse/KAFKA-8335
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.0.2, 2.1.2, 2.2.1
>
> Attachments: seg_april_25.zip, segment.zip
>
>
> My Colleague Weichu already sent out a mail to kafka user mailing list 
> regarding this issue, but we think it's worth having a ticket tracking it.
> We are using Kafka Streams with exactly-once enabled on a Kafka cluster for
> a while.
> Recently we found that the size of __consumer_offsets partitions grew huge.
> Some partition went over 30G. This caused Kafka to take quite long to load
> "__consumer_offsets" topic on startup (it loads the topic in order to
> become group coordinator).
> We dumped the __consumer_offsets segments and found that while normal
> offset commits are nicely compacted, transaction records (COMMIT, etc) are
> all preserved. Looks like that since these messages don't have a key, the
> LogCleaner is keeping them all:
> --
> $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /003484332061.log --key-decoder-class
> kafka.serializer.StringDecoder 2>/dev/null | cat -v | head
> Dumping 003484332061.log
> Starting offset: 3484332061
> offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006
> producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 81
> offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005
> producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 84
> ...
> --
> Streams is doing transaction commits per 100ms (commit.interval.ms=100 when
> exactly-once) so the __consumer_offsets is growing really fast.
> Is this (to keep all transactions) by design, or is that a bug for
> LogCleaner?  What would be the way to clean up the topic?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-09 Thread GitBox


mimaison commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r648552780



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
##
@@ -204,7 +206,8 @@ private void completeLookup(Map 
brokerIdMapping) {
 public void onResponse(
 long currentTimeMs,
 RequestSpec spec,
-AbstractResponse response
+AbstractResponse response,
+Node node

Review comment:
   Good catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-7342) Migrate streams modules to JUnit 5

2021-06-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7342:
---
Component/s: unit tests

> Migrate streams modules to JUnit 5
> --
>
> Key: KAFKA-7342
> URL: https://issues.apache.org/jira/browse/KAFKA-7342
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Ismael Juma
>Assignee: Chia-Ping Tsai
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12924) Replace EasyMock and PowerMock with Mockito in streams module(metrics)

2021-06-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12924:

Component/s: unit tests
 streams

> Replace EasyMock and PowerMock with Mockito in streams module(metrics)
> --
>
> Key: KAFKA-12924
> URL: https://issues.apache.org/jira/browse/KAFKA-12924
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: YI-CHEN WANG
>Assignee: YI-CHEN WANG
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] Moovlin commented on a change in pull request #10853: KAFKA-12811: kafka-topics.sh should let the user know they cannot adj…ust the replication factor for a topic using the --alter fl

2021-06-09 Thread GitBox


Moovlin commented on a change in pull request #10853:
URL: https://github.com/apache/kafka/pull/10853#discussion_r648550375



##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -574,6 +574,7 @@ object TopicCommand extends Logging {
   if (has(alterOpt)) {
 CommandLineUtils.checkInvalidArgsSet(parser, options, 
Set(bootstrapServerOpt, configOpt), Set(alterOpt),
 Some(kafkaConfigsCanAlterTopicConfigsViaBootstrapServer))
+CommandLineUtils.checkInvalidArgs(parser, options, 
replicationFactorOpt, Set(alterOpt))

Review comment:
   I realize that I pushed a change in my branch already which doesn't fix 
this as I expected (my testing procedure was incorrect). Thinking about this 
change more though. Should it be that replication-factor cannot be used with 
replica-assignment? As I understand it, replication-assignment is the accepted 
way to adjust the replication factor for a particular topic and this just 
exposes a more convenient way to make that change. Please correct my 
understanding if it's wrong. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-09 Thread GitBox


mimaison commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r648537766



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##
@@ -17,20 +17,34 @@
 package org.apache.kafka.clients.admin.internals;
 
 import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class CoordinatorStrategy implements 
AdminApiLookupStrategy {
+
+private static final ApiRequestScope GROUP_REQUEST_SCOPE = new 
ApiRequestScope() { };
+private static final ApiRequestScope TXN_REQUEST_SCOPE = new 
ApiRequestScope() { };
+
 private final Logger log;
+private boolean batch = true;
+private FindCoordinatorRequest.CoordinatorType type;

Review comment:
   That's a good idea, yes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wycccccc commented on a change in pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams mo…

2021-06-09 Thread GitBox


wycc commented on a change in pull request #10850:
URL: https://github.com/apache/kafka/pull/10850#discussion_r648535313



##
File path: 
streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
##
@@ -125,8 +114,8 @@ public void shouldAddAliveStreamThreadsMetric() {
 public void shouldGetFailedStreamThreadsSensor() {
 final String name = "failed-stream-threads";
 final String description = "The number of failed stream threads since 
the start of the Kafka Streams client";
-expect(streamsMetrics.clientLevelSensor(name, 
RecordingLevel.INFO)).andReturn(expectedSensor);
-expect(streamsMetrics.clientLevelTagMap()).andReturn(tagMap);
+Mockito.when(streamsMetrics.clientLevelSensor(name, 
RecordingLevel.INFO)).thenReturn(expectedSensor);
+Mockito.when(streamsMetrics.clientLevelTagMap()).thenReturn(tagMap);

Review comment:
   In the case of mockito only,I also think the static import is better. 
Already changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wycccccc commented on a change in pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams mo…

2021-06-09 Thread GitBox


wycc commented on a change in pull request #10850:
URL: https://github.com/apache/kafka/pull/10850#discussion_r648533278



##
File path: 
streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
##
@@ -21,39 +21,28 @@
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
 import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.Collections;
 import java.util.Map;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP;
 import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.powermock.api.easymock.PowerMock.createMock;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replay;
-import static org.powermock.api.easymock.PowerMock.verify;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({StreamsMetricsImpl.class, Sensor.class, ClientMetrics.class})
+@RunWith(MockitoJUnitRunner.class)

Review comment:
   I did not find this detail, thanks for the reminder. It has been 
modified.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wycccccc commented on a change in pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams mo…

2021-06-09 Thread GitBox


wycc commented on a change in pull request #10850:
URL: https://github.com/apache/kafka/pull/10850#discussion_r648532511



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
##
@@ -18,51 +18,38 @@
 
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
-import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
-import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.Collections;
 import java.util.Map;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP;
-import static org.easymock.EasyMock.expect;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.powermock.api.easymock.PowerMock.createMock;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replay;
-import static org.powermock.api.easymock.PowerMock.verify;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({StreamsMetricsImpl.class, Sensor.class, ThreadMetrics.class, 
StateStoreMetrics.class, ProcessorNodeMetrics.class})
+@RunWith(MockitoJUnitRunner.class)
 public class TaskMetricsTest {
 
 private final static String THREAD_ID = "test-thread";
 private final static String TASK_ID = "test-task";
 
-private final StreamsMetricsImpl streamsMetrics = 
createMock(StreamsMetricsImpl.class);
-private final Sensor expectedSensor = createMock(Sensor.class);
+private final StreamsMetricsImpl streamsMetrics = 
Mockito.mock(StreamsMetricsImpl.class);
+private final Sensor expectedSensor = Mockito.mock(Sensor.class);
 private final Map tagMap = 
Collections.singletonMap("hello", "world");
 
-@Before
-public void setUp() {
-expect(streamsMetrics.version()).andStubReturn(Version.LATEST);
-mockStatic(StreamsMetricsImpl.class);

Review comment:
   In Mockito we can direct definition static method,so these are no longer 
needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wycccccc commented on a change in pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams mo…

2021-06-09 Thread GitBox


wycc commented on a change in pull request #10850:
URL: https://github.com/apache/kafka/pull/10850#discussion_r648531262



##
File path: 
streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
##
@@ -135,45 +124,38 @@ public void shouldGetFailedStreamThreadsSensor() {
 false,
 description
 );
-replay(StreamsMetricsImpl.class, streamsMetrics);
 
 final Sensor sensor = 
ClientMetrics.failedStreamThreadSensor(streamsMetrics);
-
-verify(StreamsMetricsImpl.class, streamsMetrics);

Review comment:
   I think t`PowerMock.verify()` is used to verify whether` 
PowerMock.replay()` has been executed.
   `replay()`method is not needed in mockito, so there is no need to verify it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-09 Thread GitBox


tombentley commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r648530700



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1305,66 +1305,102 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
+val version = request.header.apiVersion
+if (version < 4) {
+  handleFindCoordinatorRequestLessThanV4(request)
+} else {
+  handleFindCoordinatorRequestV4AndAbove(request)

Review comment:
   Yes, obvious now you point it out! I guess I can't see a better way of 
abstracting over that. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-09 Thread GitBox


mimaison commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r648526428



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
##
@@ -250,6 +254,13 @@ public void onFailure(
 .filter(future.lookupKeys()::contains)
 .collect(Collectors.toSet());
 retryLookup(keysToUnmap);
+
+} else if (t instanceof UnsupportedBatchLookupException) {
+((CoordinatorStrategy) handler.lookupStrategy()).disableBatch();

Review comment:
   Right `UnsupportedBatchLookupException` is not a great name!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-09 Thread GitBox


mimaison commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r648525658



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorKey.java
##
@@ -24,7 +24,7 @@
 public final String idValue;
 public final FindCoordinatorRequest.CoordinatorType type;
 
-private CoordinatorKey(String idValue, 
FindCoordinatorRequest.CoordinatorType type) {
+public CoordinatorKey(String idValue, 
FindCoordinatorRequest.CoordinatorType type) {

Review comment:
   I agree, since there are very few callers, I'll make the change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan edited a comment on pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-06-09 Thread GitBox


jolshan edited a comment on pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#issuecomment-857891409


   all 3 jdk versions are failing with something like this:
   ```
Execution failed for task ':core:integrationTest'.
   > Process 'Gradle Test Executor 127' finished with non-zero exit value 1
   This problem might be caused by incorrect test process configuration.
   ```
   They build locally, so I'm not sure what's up with Jenkins. Been seeing this 
with at least one jdk build failing in many PRs in this repo. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-06-09 Thread GitBox


jolshan commented on pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#issuecomment-857891409


   all 3 jdk versions are failing with something like this:
   ```
Execution failed for task ':core:integrationTest'.
   > Process 'Gradle Test Executor 127' finished with non-zero exit value 1
   This problem might be caused by incorrect test process configuration.
   ```
   They build locally, so I'm not sure what's up with Jenkins. Been seeing this 
with many PRs in this repo. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-09 Thread GitBox


mimaison commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r648522353



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1305,66 +1305,102 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
+val version = request.header.apiVersion
+if (version < 4) {
+  handleFindCoordinatorRequestLessThanV4(request)
+} else {
+  handleFindCoordinatorRequestV4AndAbove(request)

Review comment:
   We need a different `createResponse()` for each version. Both methods 
call `getCoordinator()` and have no logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-09 Thread GitBox


mimaison commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r648520281



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
##
@@ -267,7 +278,11 @@ public void onFailure(
 private void clearInflightRequest(long currentTimeMs, RequestSpec spec) 
{
 RequestState requestState = requestStates.get(spec.scope);
 if (requestState != null) {
-requestState.clearInflight(currentTimeMs);
+if (spec.scope instanceof FulfillmentScope) {
+requestState.clearInflight(currentTimeMs + retryBackoffMs);
+} else {
+requestState.clearInflight(currentTimeMs);

Review comment:
   That surprised me too. It is the current behaviour that 
all`*RetryBackoff` tests in `KafkaAdminClientTest` enforce




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-09 Thread GitBox


mimaison commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r648519181



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##
@@ -40,61 +54,116 @@ public CoordinatorStrategy(
 
 @Override
 public ApiRequestScope lookupScope(CoordinatorKey key) {
-// The `FindCoordinator` API does not support batched lookups, so we 
use a
-// separate lookup context for each coordinator key we need to lookup
-return new LookupRequestScope(key);
+if (batch) {
+if (key.type == CoordinatorType.GROUP) {
+return GROUP_REQUEST_SCOPE;
+} else {
+return TXN_REQUEST_SCOPE;
+}
+} else {
+// If the `FindCoordinator` API does not support batched lookups, 
we use a
+// separate lookup context for each coordinator key we need to 
lookup
+return new LookupRequestScope(key);
+}
 }
 
 @Override
 public FindCoordinatorRequest.Builder buildRequest(Set 
keys) {
-CoordinatorKey key = requireSingleton(keys);
-return new FindCoordinatorRequest.Builder(
-new FindCoordinatorRequestData()
-.setKey(key.idValue)
-.setKeyType(key.type.id())
-);
+unrepresentableKeys = keys.stream().filter(k -> 
!isRepresentableKey(k.idValue)).collect(Collectors.toSet());
+keys = keys.stream().filter(k -> 
isRepresentableKey(k.idValue)).collect(Collectors.toSet());
+if (batch) {
+keys = requireSameType(keys);
+type = keys.iterator().next().type;

Review comment:
   Yes `requireSameType` ensures there is 1 type




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #10826: KAFKA-7632: Support Compression Level

2021-06-09 Thread GitBox


tombentley commented on a change in pull request #10826:
URL: https://github.com/apache/kafka/pull/10826#discussion_r648485713



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
##
@@ -183,8 +183,8 @@ public double compressionRatio() {
 return actualCompressionRatio;
 }
 
-public CompressionType compressionType() {
-return compressionType;
+public CompressionConfig compressionConfing() {

Review comment:
   typo, it should be `compressionConfig()`

##
File path: 
clients/src/main/java/org/apache/kafka/common/record/CompressionConfig.java
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class CompressionConfig {
+private final CompressionType type;
+private final Integer level;
+
+public static CompressionConfig none() {
+return of(CompressionType.NONE);
+}
+
+public static CompressionConfig of(final CompressionType type) {
+return of(Objects.requireNonNull(type), null);
+}
+
+public static CompressionConfig of(final CompressionType type, final 
Integer level) {
+return new CompressionConfig(Objects.requireNonNull(type), level);
+}
+
+private CompressionConfig(final CompressionType type, final Integer level) 
{
+this.type = type;
+
+if (level != null && !type.isValidLevel(level.intValue())) {
+throw new IllegalArgumentException("Illegal level " + level + " 
for compression codec " + type.name);

Review comment:
   Should the message include the valid levels?

##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -933,6 +934,9 @@ object KafkaConfig {
   val CompressionTypeDoc = "Specify the final compression type for a given 
topic. This configuration accepts the standard compression codecs " +
   "('gzip', 'snappy', 'lz4', 'zstd'). It additionally accepts 'uncompressed' 
which is equivalent to no compression; and " +
   "'producer' which means retain the original compression codec set by the 
producer."
+  val CompressionLevelDoc = "The compression level for all data generated by 
the producer. The default level and valid value is up to " +
+"compression.type. (none, snappy: not available. 
gzip: 1~9. lz4: 1~17. " +
+"zstd: -131072~22."

Review comment:
   Since the docs use `[min,...,max]` syntax for things validated by 
`ConfigDef.Range` I think we should use a similar syntax here for consistency 
(even though it's not validated by `Range`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745) (WIP)

2021-06-09 Thread GitBox


kpatelatwork commented on pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#issuecomment-857875970


   Below are some logs when both connector and tasks are restarted.
   
   > [2021-06-09 12:02:26,045] DEBUG Writing 
RestartRequest{connectorName='simple-source', onlyFailed=false, 
includeTasks=true} to Kafka 
(org.apache.kafka.connect.storage.KafkaConfigBackingStore:487)
   [2021-06-09 12:02:26,049] INFO [Worker clientId=connect-1, 
groupId=connect-integration-test-connect-cluster] Received 
RestartRequest{connectorName='simple-source', onlyFailed=false, 
includeTasks=true} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1709)
   [2021-06-09 12:02:26,051] INFO [Worker clientId=connect-1, 
groupId=connect-integration-test-connect-cluster] Executing plan to restart 
connector and 4 of 4 tasks for RestartRequest{connectorName='simple-source', 
onlyFailed=false, includeTasks=true} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1137)
   [2021-06-09 12:02:26,063] DEBUG [Worker clientId=connect-1, 
groupId=connect-integration-test-connect-cluster] Restarting 4 of 4 tasks for 
RestartRequest{connectorName='simple-source', onlyFailed=false, 
includeTasks=true} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1169)
   [2021-06-09 12:02:26,114] DEBUG [Worker clientId=connect-1, 
groupId=connect-integration-test-connect-cluster] Restarted 4 of 4 tasks for 
RestartRequest{connectorName='simple-source', onlyFailed=false, 
includeTasks=true} as requested 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1171)
   [2021-06-09 12:02:26,114] INFO [Worker clientId=connect-1, 
groupId=connect-integration-test-connect-cluster] Completed plan to restart 
connector and 4 of 4 tasks for RestartRequest{connectorName='simple-source', 
onlyFailed=false, includeTasks=true} 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1173)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10856: MINOR: Small optimizations and removal of unused code in Streams

2021-06-09 Thread GitBox


jlprat commented on pull request #10856:
URL: https://github.com/apache/kafka/pull/10856#issuecomment-857842621


   Pinging @mjsax on this one as you did some changes on Streams recently. 
Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10855: MINOR: clean up unneeded `@SuppressWarnings` on Streams module

2021-06-09 Thread GitBox


jlprat commented on pull request #10855:
URL: https://github.com/apache/kafka/pull/10855#issuecomment-857841468


   cc. @mjsax as you were doing a PR touching similar things recently, maybe 
you want to review this one as well?
   Also pinging @vvcephei as he was reviewing that PR I was referring to.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10856: MINOR: Small optimizations and removal of unused code in Streams

2021-06-09 Thread GitBox


jlprat commented on a change in pull request #10856:
URL: https://github.com/apache/kafka/pull/10856#discussion_r648463106



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
##
@@ -63,10 +63,6 @@
 return joinMergeProcessorParameters;
 }
 
-ValueJoinerWithKey 
valueJoiner() {

Review comment:
   As it is an internal class and this method is not use in the code base, 
I assume is safe to delete.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
##
@@ -39,9 +39,6 @@ public KP getPrimaryKey() {
 }
 
 public boolean equals(final KF foreignKey, final KP primaryKey) {
-if (this.primaryKey == null) {

Review comment:
   This is removed because `primaryKey` is guaranteed in constructor to 
never be null.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -294,14 +293,6 @@ public final void removeAllThreadLevelSensors(final String 
threadId) {
 return tagMap;
 }
 
-public Map bufferLevelTagMap(final String threadId,

Review comment:
   This public function on an internal class is never used in our code 
base, so I assume it is safe to delete.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
##
@@ -59,11 +59,11 @@ public void prepareTopology() throws InterruptedException {
 rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.>as("right").withLoggingDisabled());
 }
 
-final private TestRecord expectedFinalJoinResult = new 
TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L);
-final private TestRecord expectedFinalMultiJoinResult = new 
TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null,  15L);
-final private String storeName = appID + "-store";
+private final TestRecord expectedFinalJoinResult = new 
TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L);

Review comment:
   Just using the `private final` order consistently

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -562,10 +553,6 @@ private String externalChildSensorName(final String 
threadId, final String opera
 + SENSOR_NAME_DELIMITER + operationName;
 }
 
-private String externalParentSensorName(final String threadId, final 
String operationName) {

Review comment:
   This private function was never used

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -246,15 +245,15 @@ public final void removeAllClientLevelSensorsAndMetrics() 
{
 removeAllClientLevelMetrics();
 }
 
-private final void removeAllClientLevelMetrics() {

Review comment:
   `private` and `final` together doesn't really make sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat opened a new pull request #10856: MINOR: Small optimizations and removal of unused code in Streams

2021-06-09 Thread GitBox


jlprat opened a new pull request #10856:
URL: https://github.com/apache/kafka/pull/10856


   Remove unused methods in internal classes
   Mark fields that can be final as final
   Remove unneeded generic type annotation
   Convert single use fields to local final variables
   Use method reference in lambdas when it's more readable
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat opened a new pull request #10855: MINOR: clean up unneeded `@SuppressWarnings`

2021-06-09 Thread GitBox


jlprat opened a new pull request #10855:
URL: https://github.com/apache/kafka/pull/10855


   Remove unneeded `@SuppressWarnings("unchecked")` in source and test
   Remove unneeded `@SuppressWarnings("deprecated")` in source and test
   
   Several of those annotations were either never needed, or the code that 
force its introduction is already gone.
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on a change in pull request #10428: KAFKA-12572: Add import ordering checkstyle rule and configure an automatic formatter

2021-06-09 Thread GitBox


dongjinleekr commented on a change in pull request #10428:
URL: https://github.com/apache/kafka/pull/10428#discussion_r648435365



##
File path: README.md
##
@@ -207,6 +207,20 @@ You can run checkstyle using:
 The checkstyle warnings will be found in 
`reports/checkstyle/reports/main.html` and 
`reports/checkstyle/reports/test.html` files in the
 subproject build directories. They are also printed to the console. The build 
will fail if Checkstyle fails.
 
+As of present, the auto-formatting configuration is work in progress. 
Auto-formatting is automatically invoked for the modules listed below when the 
'checkstyleMain' or 'checkstyleTest' task is run.
+
+- (No modules specified yet)
+
+You can also run auto-formatting independently for a single module listed 
above, like:
+
+./gradlew :core:spotlessApply   # auto-format *.java files in core module, 
without running checkstyleMain or checkstyleTest.
+
+If you are using an IDE, you can use a plugin that provides real-time 
automatic formatting. For detailed information, refer to the following links:
+
+- [Eclipse](https://checkstyle.org/eclipse-cs)
+- [Intellij](https://plugins.jetbrains.com/plugin/1065-checkstyle-idea)
+- 
[Vscode](https://marketplace.visualstudio.com/items?itemName=shengchen.vscode-checkstyle)
+

Review comment:
   I thought it would be better to update the list every time the streams' 
packages are configured to auto-formatting - and since I am ready for this 
task, it will not need much time.
   
   Moreover, These descriptions are applied to the whole project, isn't it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on a change in pull request #10428: KAFKA-12572: Add import ordering checkstyle rule and configure an automatic formatter

2021-06-09 Thread GitBox


dongjinleekr commented on a change in pull request #10428:
URL: https://github.com/apache/kafka/pull/10428#discussion_r648427370



##
File path: build.gradle
##
@@ -604,6 +625,9 @@ subprojects {
 description = 'Run checkstyle on all main Java sources'
   }
 
+  checkstyleMain.dependsOn('spotlessApply')
+  checkstyleTest.dependsOn('spotlessApply')

Review comment:
   Yes, we need this configuration; These dependencies are applied to the 
whole project, not core module only.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on a change in pull request #10853: KAFKA-12811: kafka-topics.sh should let the user know they cannot adj…ust the replication factor for a topic using the --alte

2021-06-09 Thread GitBox


wenbingshen commented on a change in pull request #10853:
URL: https://github.com/apache/kafka/pull/10853#discussion_r648413266



##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -574,6 +574,7 @@ object TopicCommand extends Logging {
   if (has(alterOpt)) {
 CommandLineUtils.checkInvalidArgsSet(parser, options, 
Set(bootstrapServerOpt, configOpt), Set(alterOpt),
 Some(kafkaConfigsCanAlterTopicConfigsViaBootstrapServer))
+CommandLineUtils.checkInvalidArgs(parser, options, 
replicationFactorOpt, Set(alterOpt))

Review comment:
   It’s okay if you just indicate that --alter cannot be used to update the 
replication-factor. Let's see what others say. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams mo…

2021-06-09 Thread GitBox


vvcephei commented on a change in pull request #10850:
URL: https://github.com/apache/kafka/pull/10850#discussion_r648363737



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
##
@@ -18,87 +18,73 @@
 
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
-import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
 
 import java.util.Collections;
 import java.util.Map;
 import java.util.function.Supplier;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.mock;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.powermock.api.easymock.PowerMock.createMock;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replay;
-import static org.powermock.api.easymock.PowerMock.verify;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({StreamsMetricsImpl.class, Sensor.class})
+@RunWith(MockitoJUnitRunner.class)
 public class ProcessorNodeMetricsTest {
 
 private static final String THREAD_ID = "test-thread";
 private static final String TASK_ID = "test-task";
 private static final String PROCESSOR_NODE_ID = "test-processor";
 
-private final Sensor expectedSensor = mock(Sensor.class);
-private final Sensor expectedParentSensor = mock(Sensor.class);
-private final StreamsMetricsImpl streamsMetrics = 
createMock(StreamsMetricsImpl.class);
 private final Map tagMap = 
Collections.singletonMap("hello", "world");
 private final Map parentTagMap = 
Collections.singletonMap("hi", "universe");
 
-@Before
-public void setUp() {
-expect(streamsMetrics.version()).andStubReturn(Version.LATEST);
-mockStatic(StreamsMetricsImpl.class);
-}
+private final Sensor expectedSensor = Mockito.mock(Sensor.class);

Review comment:
   In general, it's good to try and avoid shuffling lines around and 
reformatting the code in functional PRs. It just makes it harder to ensure a 
good review.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
##
@@ -135,45 +124,38 @@ public void shouldGetFailedStreamThreadsSensor() {
 false,
 description
 );
-replay(StreamsMetricsImpl.class, streamsMetrics);
 
 final Sensor sensor = 
ClientMetrics.failedStreamThreadSensor(streamsMetrics);
-
-verify(StreamsMetricsImpl.class, streamsMetrics);

Review comment:
   Did we lose a verification here?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
##
@@ -208,11 +194,7 @@ private void setUpThroughputSensor(final String 
metricNamePrefix,
 }
 
 private void verifySensor(final Supplier sensorSupplier) {
-replay(StreamsMetricsImpl.class, streamsMetrics);
-
 final Sensor sensor = sensorSupplier.get();
-
-verify(StreamsMetricsImpl.class, streamsMetrics);

Review comment:
   Should we continue to verify the interactions?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
##
@@ -18,51 +18,38 @@
 
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
-import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
-import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.Collections;
 import java.util.Map;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP;
-import static org.easymock.EasyMock.expect;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.powermock.api.easymock.PowerMock.createMock;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replay;
-import static org.powermock.api.easymock.PowerMock.verify;
 
-@RunWith(PowerMockRunner.class)

[GitHub] [kafka] wenbingshen commented on a change in pull request #10853: KAFKA-12811: kafka-topics.sh should let the user know they cannot adj…ust the replication factor for a topic using the --alte

2021-06-09 Thread GitBox


wenbingshen commented on a change in pull request #10853:
URL: https://github.com/apache/kafka/pull/10853#discussion_r648397088



##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -574,6 +574,7 @@ object TopicCommand extends Logging {
   if (has(alterOpt)) {
 CommandLineUtils.checkInvalidArgsSet(parser, options, 
Set(bootstrapServerOpt, configOpt), Set(alterOpt),
 Some(kafkaConfigsCanAlterTopicConfigsViaBootstrapServer))
+CommandLineUtils.checkInvalidArgs(parser, options, 
replicationFactorOpt, Set(alterOpt))

Review comment:
   The following command still prompts `Missing required argument 
"[partitions]"`: `./bin/kafka-topics.sh --bootstrap-server `hostname`:9092 
--alter --topic test-01 --replica-assignment 
1002:1001,1003:1002,1001:1003,1001:1002,1002:1003,1003:1001,1002:1003 
--replication-factor 3`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

2021-06-09 Thread GitBox


lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r648383874



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
##
@@ -78,6 +79,10 @@
 throw new StreamsException("Fatal user code error in 
deserialization error callback", fatalUserException);
 }
 
+if (deserializationException instanceof ConfigException) {

Review comment:
   Makes sense, I'll shift around




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #10813: KAFKA-9559: Change default serde to be `null`

2021-06-09 Thread GitBox


lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r648383533



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
##
@@ -60,8 +60,8 @@ public void init(final InternalProcessorContext context) {
 this.context = context;
 final Serializer contextKeySerializer = 
ProcessorContextUtils.getKeySerializer(context);
 final Serializer contextValueSerializer = 
ProcessorContextUtils.getValueSerializer(context);
-keySerializer = prepareKeySerializer(keySerializer, 
contextKeySerializer, contextValueSerializer);
-valSerializer = prepareValueSerializer(valSerializer, 
contextKeySerializer, contextValueSerializer);
+keySerializer = prepareKeySerializer(keySerializer, 
contextKeySerializer, contextValueSerializer, this.name());
+valSerializer = prepareValueSerializer(valSerializer, 
contextKeySerializer, contextValueSerializer, this.name());

Review comment:
   @ableegoldman and I talked about this PR briefly yesterday, maybe she 
has thoughts here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12468) Initial offsets are copied from source to target cluster

2021-06-09 Thread Alexis Josephides (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360131#comment-17360131
 ] 

Alexis Josephides edited comment on KAFKA-12468 at 6/9/21, 2:43 PM:


We are also seeing this issue despite having <10 partitions per task.

[~askldjd] looking at your original configuration you have the `tasks.max` for 
both the Source and Checkpoint connector to be equal. When you increased the 
tasks on the Source connector to 500 to overcome the race condition did you 
find any starvation issues that resulted in a similar effect? Or, did you also 
increase the `tasks.max` on the Checkpoint connector too?

 


was (Author: ajosephides):
We are also seeing this issue despite having <10 partitions per task.

[~askldjd] looking at your original configuration you have the `tasks.max` for 
both the Source and Checkpoint connector to be equal. When you increased the 
tasks on the Source connector to 500 to overcome the race condition did you 
find any starvation issues that resulted in a similar effect?

 

> Initial offsets are copied from source to target cluster
> 
>
> Key: KAFKA-12468
> URL: https://issues.apache.org/jira/browse/KAFKA-12468
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Bart De Neuter
>Priority: Major
>
> We have an active-passive setup where  the 3 connectors from mirror maker 2 
> (heartbeat, checkpoint and source) are running on a dedicated Kafka connect 
> cluster on the target cluster.
> Offset syncing is enabled as specified by KIP-545. But when activated, it 
> seems the offsets from the source cluster are initially copied to the target 
> cluster without translation. This causes a negative lag for all synced 
> consumer groups. Only when we reset the offsets for each topic/partition on 
> the target cluster and produce a record on the topic/partition in the source, 
> the sync starts working correctly. 
> I would expect that the consumer groups are synced but that the current 
> offsets of the source cluster are not copied to the target cluster.
> This is the configuration we are currently using:
> Heartbeat connector
>  
> {code:xml}
> {
>   "name": "mm2-mirror-heartbeat",
>   "config": {
> "name": "mm2-mirror-heartbeat",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "1",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
> Checkpoint connector:
> {code:xml}
> {
>   "name": "mm2-mirror-checkpoint",
>   "config": {
> "name": "mm2-mirror-checkpoint",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  Source connector:
> {code:xml}
> {
>   "name": "mm2-mirror-source",
>   "config": {
> "name": "mm2-mirror-source",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> 

[jira] [Commented] (KAFKA-12468) Initial offsets are copied from source to target cluster

2021-06-09 Thread Alexis Josephides (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360131#comment-17360131
 ] 

Alexis Josephides commented on KAFKA-12468:
---

We are also seeing this issue despite having <10 partitions per task.

[~askldjd] looking at your original configuration you have the `tasks.max` for 
both the Source and Checkpoint connector to be equal. When you increased the 
tasks on the Source connector to 500 to overcome the race condition did you 
find any starvation issues that resulted in a similar effect?

 

> Initial offsets are copied from source to target cluster
> 
>
> Key: KAFKA-12468
> URL: https://issues.apache.org/jira/browse/KAFKA-12468
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Bart De Neuter
>Priority: Major
>
> We have an active-passive setup where  the 3 connectors from mirror maker 2 
> (heartbeat, checkpoint and source) are running on a dedicated Kafka connect 
> cluster on the target cluster.
> Offset syncing is enabled as specified by KIP-545. But when activated, it 
> seems the offsets from the source cluster are initially copied to the target 
> cluster without translation. This causes a negative lag for all synced 
> consumer groups. Only when we reset the offsets for each topic/partition on 
> the target cluster and produce a record on the topic/partition in the source, 
> the sync starts working correctly. 
> I would expect that the consumer groups are synced but that the current 
> offsets of the source cluster are not copied to the target cluster.
> This is the configuration we are currently using:
> Heartbeat connector
>  
> {code:xml}
> {
>   "name": "mm2-mirror-heartbeat",
>   "config": {
> "name": "mm2-mirror-heartbeat",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "1",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
> Checkpoint connector:
> {code:xml}
> {
>   "name": "mm2-mirror-checkpoint",
>   "config": {
> "name": "mm2-mirror-checkpoint",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  Source connector:
> {code:xml}
> {
>   "name": "mm2-mirror-source",
>   "config": {
> "name": "mm2-mirror-source",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante opened a new pull request #10854: KAFKA-12717: Remove internal Connect converter properties

2021-06-09 Thread GitBox


C0urante opened a new pull request #10854:
URL: https://github.com/apache/kafka/pull/10854


   [Jira](https://issues.apache.org/jira/browse/KAFKA-12717) / 
[KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-738%3A+Removal+of+Connect%27s+internal+converter+properties)
   
   Changes:
   
   - Drop support for user-specified internal converters and their properties
   - Log a warning if users attempt to specify internal converter-related 
properties in their worker config
   - Remove all references to the internal converter properties from the code 
base aside from the above-described warning logic
   - Updated `Plugins` test for internal converter instantiation as a sanity 
check (though this is likely redundant given the immense coverage already 
provided by existing integration tests)
   - Adds an item to the upgrade notes about the removal of these properties
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams mo…

2021-06-09 Thread GitBox


ijuma commented on a change in pull request #10850:
URL: https://github.com/apache/kafka/pull/10850#discussion_r648316500



##
File path: 
streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
##
@@ -125,8 +114,8 @@ public void shouldAddAliveStreamThreadsMetric() {
 public void shouldGetFailedStreamThreadsSensor() {
 final String name = "failed-stream-threads";
 final String description = "The number of failed stream threads since 
the start of the Kafka Streams client";
-expect(streamsMetrics.clientLevelSensor(name, 
RecordingLevel.INFO)).andReturn(expectedSensor);
-expect(streamsMetrics.clientLevelTagMap()).andReturn(tagMap);
+Mockito.when(streamsMetrics.clientLevelSensor(name, 
RecordingLevel.INFO)).thenReturn(expectedSensor);
+Mockito.when(streamsMetrics.clientLevelTagMap()).thenReturn(tagMap);

Review comment:
   From a style perspective, you can import `when` via a static import to 
make it more readable. Similarly for `eq` and so on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams mo…

2021-06-09 Thread GitBox


ijuma commented on a change in pull request #10850:
URL: https://github.com/apache/kafka/pull/10850#discussion_r648316500



##
File path: 
streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
##
@@ -125,8 +114,8 @@ public void shouldAddAliveStreamThreadsMetric() {
 public void shouldGetFailedStreamThreadsSensor() {
 final String name = "failed-stream-threads";
 final String description = "The number of failed stream threads since 
the start of the Kafka Streams client";
-expect(streamsMetrics.clientLevelSensor(name, 
RecordingLevel.INFO)).andReturn(expectedSensor);
-expect(streamsMetrics.clientLevelTagMap()).andReturn(tagMap);
+Mockito.when(streamsMetrics.clientLevelSensor(name, 
RecordingLevel.INFO)).thenReturn(expectedSensor);
+Mockito.when(streamsMetrics.clientLevelTagMap()).thenReturn(tagMap);

Review comment:
   From a style perspective, you can import `when` via a static import to 
make it more readable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams mo…

2021-06-09 Thread GitBox


ijuma commented on a change in pull request #10850:
URL: https://github.com/apache/kafka/pull/10850#discussion_r648315914



##
File path: 
streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
##
@@ -21,39 +21,28 @@
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
 import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.Collections;
 import java.util.Map;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.CLIENT_LEVEL_GROUP;
 import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.powermock.api.easymock.PowerMock.createMock;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replay;
-import static org.powermock.api.easymock.PowerMock.verify;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({StreamsMetricsImpl.class, Sensor.class, ClientMetrics.class})
+@RunWith(MockitoJUnitRunner.class)

Review comment:
   Why do we need the runner? In other modules, we use mockito without the 
runner. The reason why we want to avoid this runner is that it's a JUnit 4 
thing and we want to migrate to JUnit 5 soonish.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Moovlin opened a new pull request #10853: KAFKA-12811: kafka-topics.sh should let the user know they cannot adj…ust the replication factor for a topic using the --alter flag and not w

2021-06-09 Thread GitBox


Moovlin opened a new pull request #10853:
URL: https://github.com/apache/kafka/pull/10853


   KAFKA-12811: kafka-topics.sh should let the user know they cannot adj…ust 
the replication factor for a topic using the --alter flag and not warn about 
missing the --partition flag
   
   This work is my own and I license it to the project. 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10428: KAFKA-12572: Add import ordering checkstyle rule and configure an automatic formatter

2021-06-09 Thread GitBox


cadonna commented on a change in pull request #10428:
URL: https://github.com/apache/kafka/pull/10428#discussion_r648251540



##
File path: README.md
##
@@ -207,6 +207,20 @@ You can run checkstyle using:
 The checkstyle warnings will be found in 
`reports/checkstyle/reports/main.html` and 
`reports/checkstyle/reports/test.html` files in the
 subproject build directories. They are also printed to the console. The build 
will fail if Checkstyle fails.
 
+As of present, the auto-formatting configuration is work in progress. 
Auto-formatting is automatically invoked for the modules listed below when the 
'checkstyleMain' or 'checkstyleTest' task is run.
+
+- (No modules specified yet)
+
+You can also run auto-formatting independently for a single module listed 
above, like:
+
+./gradlew :core:spotlessApply   # auto-format *.java files in core module, 
without running checkstyleMain or checkstyleTest.
+
+If you are using an IDE, you can use a plugin that provides real-time 
automatic formatting. For detailed information, refer to the following links:
+
+- [Eclipse](https://checkstyle.org/eclipse-cs)
+- [Intellij](https://plugins.jetbrains.com/plugin/1065-checkstyle-idea)
+- 
[Vscode](https://marketplace.visualstudio.com/items?itemName=shengchen.vscode-checkstyle)
+

Review comment:
   I would remove this until we have it setup for Streams.

##
File path: build.gradle
##
@@ -604,6 +625,9 @@ subprojects {
 description = 'Run checkstyle on all main Java sources'
   }
 
+  checkstyleMain.dependsOn('spotlessApply')
+  checkstyleTest.dependsOn('spotlessApply')

Review comment:
   Don't you need to remove this to not format the code in core?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12922) MirrorCheckpointTask should close topic filter

2021-06-09 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass updated KAFKA-12922:

Description: MirrorCheckpointTask uses a TopicFilter, but never closes it 
and leaks resources.  (was: MirrorCheckpointTask utilizes a TopicFilter, but 
never closes it and leaks resources.)

> MirrorCheckpointTask should close topic filter
> --
>
> Key: KAFKA-12922
> URL: https://issues.apache.org/jira/browse/KAFKA-12922
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.8.0
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> MirrorCheckpointTask uses a TopicFilter, but never closes it and leaks 
> resources.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12922) MirrorCheckpointTask should close topic filter

2021-06-09 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass updated KAFKA-12922:

Description: MirrorCheckpointTask utilizes a TopicFilter, but never closes 
it and leaks resources.  (was: When a lot of connectors are restarted it turned 
out that underlying ConfigConsumers are not closed property and from the logs 
we can see that the old ones are still running.

Turns out that MirrorCheckpointTask utilizes a TopicFilter, but never closes 
it, leaking resources.)

> MirrorCheckpointTask should close topic filter
> --
>
> Key: KAFKA-12922
> URL: https://issues.apache.org/jira/browse/KAFKA-12922
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.8.0
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> MirrorCheckpointTask utilizes a TopicFilter, but never closes it and leaks 
> resources.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming opened a new pull request #10852: MINOR: Replace easymock with mockito in log4j-appender

2021-06-09 Thread GitBox


dengziming opened a new pull request #10852:
URL: https://github.com/apache/kafka/pull/10852


   *More detailed description of your change*
   As the title
   
   *Summary of testing strategy (including rationale)*
   QA
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik edited a comment on pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-09 Thread GitBox


kowshik edited a comment on pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#issuecomment-857548583


   Thanks for the review @junrao! I've addressed the comments in 8f14879.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wycccccc commented on a change in pull request #10835: KAFKA-12905: Replace EasyMock and PowerMock with Mockito for NamedCacheMetricsTest

2021-06-09 Thread GitBox


wycc commented on a change in pull request #10835:
URL: https://github.com/apache/kafka/pull/10835#discussion_r648148202



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/NamedCacheMetricsTest.java
##
@@ -49,37 +41,28 @@
 private static final String HIT_RATIO_MIN_DESCRIPTION = "The minimum cache 
hit ratio";
 private static final String HIT_RATIO_MAX_DESCRIPTION = "The maximum cache 
hit ratio";
 
-private final StreamsMetricsImpl streamsMetrics = 
createMock(StreamsMetricsImpl.class);
-private final Sensor expectedSensor = mock(Sensor.class);
+private Sensor expectedSensor = Mockito.mock(Sensor.class);
 private final Map tagMap = mkMap(mkEntry("key", "value"));
 
 @Test
 public void shouldGetHitRatioSensorWithBuiltInMetricsVersionCurrent() {
 final String hitRatio = "hit-ratio";
-mockStatic(StreamsMetricsImpl.class);
-expect(streamsMetrics.version()).andStubReturn(Version.LATEST);
-expect(streamsMetrics.cacheLevelSensor(THREAD_ID, TASK_ID, STORE_NAME, 
hitRatio, RecordingLevel.DEBUG))
-.andReturn(expectedSensor);
-expect(streamsMetrics.cacheLevelTagMap(THREAD_ID, TASK_ID, 
STORE_NAME)).andReturn(tagMap);
+final StreamsMetricsImpl streamsMetrics = 
Mockito.mock(StreamsMetricsImpl.class);
+Mockito.when(streamsMetrics.cacheLevelSensor(THREAD_ID, TASK_ID, 
STORE_NAME, hitRatio, RecordingLevel.DEBUG)).thenReturn(expectedSensor);
+Mockito.when(streamsMetrics.cacheLevelTagMap(THREAD_ID, TASK_ID, 
STORE_NAME)).thenReturn(tagMap);
 StreamsMetricsImpl.addAvgAndMinAndMaxToSensor(
-expectedSensor,
-StreamsMetricsImpl.CACHE_LEVEL_GROUP,
-tagMap,
-hitRatio,
-HIT_RATIO_AVG_DESCRIPTION,
-HIT_RATIO_MIN_DESCRIPTION,
-HIT_RATIO_MAX_DESCRIPTION);
-replay(streamsMetrics);
-replay(StreamsMetricsImpl.class);
+expectedSensor,
+StreamsMetricsImpl.CACHE_LEVEL_GROUP,
+tagMap,
+hitRatio,
+HIT_RATIO_AVG_DESCRIPTION,
+HIT_RATIO_MIN_DESCRIPTION,
+HIT_RATIO_MAX_DESCRIPTION);

Review comment:
   Thank you for the review. @ijuma  @chia7712 @cadonna 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mdedetrich commented on a change in pull request #10839: KAFKA-12913: Make case class's final

2021-06-09 Thread GitBox


mdedetrich commented on a change in pull request #10839:
URL: https://github.com/apache/kafka/pull/10839#discussion_r648171013



##
File path: gradle/spotbugs-exclude.xml
##
@@ -132,6 +132,13 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
 
 
 
+

Review comment:
   A spurious false warning that was being created by spotbugs




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik edited a comment on pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-09 Thread GitBox


kowshik edited a comment on pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#issuecomment-857548583


   Thanks for the review @junrao! I've addressed the comments in 8f14879. I've 
also triggered a system test run on the most recent commit, I'll review it once 
it completes. The link is attached to the PR description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze edited a comment on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

2021-06-09 Thread GitBox


lkokhreidze edited a comment on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-857567693


   Hi @cadonna
   FYI, I've opened a PR around rack aware standby task assignment 
https://github.com/apache/kafka/pull/10851


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

2021-06-09 Thread GitBox


lkokhreidze commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-857567693


   Hi @cadonna
   FYI, I've opened a PR around rack aware standby task assignment 
   https://github.com/apache/kafka/pull/10851


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-06-09 Thread GitBox


lkokhreidze commented on pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#issuecomment-857564039


   Call for review @cadonna @vvcephei @ableegoldman


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze opened a new pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-06-09 Thread GitBox


lkokhreidze opened a new pull request #10851:
URL: https://github.com/apache/kafka/pull/10851


   This PR is part of 
[KIP-708](https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams)
 and adds rack aware standby task assignment logic.
   
   Rack aware standby task assignment won't be functional until all part of 
this KIP gets merged.
   
   Splitting PRs into three smaller PRs to make the review process easier to 
follow. Overall plan is the following:
   
     Rack aware standby task assignment logic.
   ⏭️  Protocol change, add clientTags to SubscriptionInfoData 
https://github.com/apache/kafka/pull/10802
   ⏭️  Add required configurations to StreamsConfig (public API change, at this 
point we should have full functionality)
   
   This PR implements first point of the above mentioned plan.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-09 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r648134244



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1788,14 +1559,8 @@ class Log(@volatile private var _dir: File,
 maybeHandleIOException(s"Error while truncating the entire log for 
$topicPartition in dir ${dir.getParent}") {
   debug(s"Truncate and start at offset $newOffset")
   lock synchronized {
-checkIfMemoryMappedBufferClosed()
-removeAndDeleteSegments(logSegments, asyncDelete = true, LogTruncation)
-addSegment(LogSegment.open(dir,
-  baseOffset = newOffset,
-  config = config,
-  time = time,
-  initFileSize = config.initFileSize,
-  preallocate = config.preallocate))
+val deletedSegments = localLog.truncateFullyAndStartAt(newOffset)
+deleteProducerSnapshots(deletedSegments, asyncDelete = true)

Review comment:
   Done in 8f14879.

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1812,37 +1577,36 @@ class Log(@volatile private var _dir: File,
 endOffset: Long
   ): Unit = {
 logStartOffset = startOffset
-nextOffsetMetadata = LogOffsetMetadata(endOffset, 
activeSegment.baseOffset, activeSegment.size)
-recoveryPoint = math.min(recoveryPoint, endOffset)
+localLog.updateLogEndOffset(endOffset)

Review comment:
   Done in 8f14879.

##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1010 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.{File, IOException}
+import java.nio.file.Files
+import java.text.NumberFormat
+import java.util.concurrent.atomic.AtomicLong
+import java.util.regex.Pattern
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{Logging, Scheduler}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{KafkaStorageException, 
OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.Seq
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+
+/**
+ * Holds the result of splitting a segment into one or more segments, see 
LocalLog.splitOverflowedSegment().
+ *
+ * @param deletedSegments segments deleted when splitting a segment
+ * @param newSegments new segments created when splitting a segment
+ */
+case class SplitSegmentResult(deletedSegments: Iterable[LogSegment], 
newSegments: Iterable[LogSegment])
+
+/**
+ * An append-only log for storing messages locally. The log is a sequence of 
LogSegments, each with a base offset.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param segments The non-empty log segments recovered from disk
+ * @param recoveryPoint The offset at which to begin the next recovery i.e. 
the first offset which has not been flushed to disk
+ * @param nextOffsetMetadata The offset where the next message could be 
appended
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ */
+private[log] class LocalLog(@volatile private var _dir: File,
+@volatile var config: LogConfig,
+val segments: LogSegments,
+@volatile var recoveryPoint: Long,
+@volatile private var nextOffsetMetadata: 
LogOffsetMetadata,
+   

[GitHub] [kafka] wycccccc commented on a change in pull request #10835: KAFKA-12905: Replace EasyMock and PowerMock with Mockito for NamedCacheMetricsTest

2021-06-09 Thread GitBox


wycc commented on a change in pull request #10835:
URL: https://github.com/apache/kafka/pull/10835#discussion_r648145898



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/NamedCacheMetricsTest.java
##
@@ -49,37 +41,28 @@
 private static final String HIT_RATIO_MIN_DESCRIPTION = "The minimum cache 
hit ratio";
 private static final String HIT_RATIO_MAX_DESCRIPTION = "The maximum cache 
hit ratio";
 
-private final StreamsMetricsImpl streamsMetrics = 
createMock(StreamsMetricsImpl.class);
-private final Sensor expectedSensor = mock(Sensor.class);
+private Sensor expectedSensor = Mockito.mock(Sensor.class);

Review comment:
   OK,I have changed.Thanks your suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >