[jira] [Updated] (KAFKA-12454) Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-12 Thread Wenbing Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbing Shen updated KAFKA-12454:
-
Affects Version/s: 2.8.0

> Add ERROR logging on kafka-log-dirs when given brokerIds do not  exist in 
> current kafka cluster
> ---
>
> Key: KAFKA-12454
> URL: https://issues.apache.org/jira/browse/KAFKA-12454
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.8.0
>Reporter: Wenbing Shen
>Assignee: Wenbing Shen
>Priority: Minor
>
> When non-existent brokerIds value are given, the kafka-log-dirs tool will 
> have a timeout error:
> Exception in thread "main" java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment. Call: describeLogDirs
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  at kafka.admin.LogDirsCommand$.describe(LogDirsCommand.scala:50)
>  at kafka.admin.LogDirsCommand$.main(LogDirsCommand.scala:36)
>  at kafka.admin.LogDirsCommand.main(LogDirsCommand.scala)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: describeLogDirs
>  
> When the brokerId entered by the user does not exist, an error message 
> indicating that the node is not present should be printed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wenbingshen commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-12 Thread GitBox


wenbingshen commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r593706745



##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -40,9 +40,16 @@ object LogDirsCommand {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
 val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
+val clusterBrokers: Array[Int] = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
 val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
 case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+case None => clusterBrokers
+}
+
+val nonExistBrokers: Array[Int] = brokerList.filterNot(brokerId => 
clusterBrokers.contains(brokerId))
+if (!nonExistBrokers.isEmpty) {
+  System.err.println(s"The given node(s) does not exist from 
broker-list ${nonExistBrokers.mkString(",")}")
+  sys.exit(1)

Review comment:
   > Should we do this only when handle the brokers provided by the user? 
It does not make sense to validate the list of brokers otherwise. What do you 
think?
   
   Your suggestion is very good.I have changed the logic, please review.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-12 Thread GitBox


wenbingshen commented on pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#issuecomment-797884025


   > Thanks for the PR. I left a minor suggestion. Could we also add a test 
case?
   
   Thanks for your comment.Your suggestion is very good, we only need to judge 
on the node entered by the user, I have added a unit test, please review it 
again.Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12426) Missing logic to create partition.metadata files in RaftReplicaManager

2021-03-12 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300726#comment-17300726
 ] 

Jason Gustafson commented on KAFKA-12426:
-

[~jolshan] Thanks, good find. Since it is not a regression, it's tough to call 
it a blocker. I'd suggest we target 2.8.1 for now, but if the patch is not too 
crazy, we might still be able to get it in depending on RC progress.

> Missing logic to create partition.metadata files in RaftReplicaManager
> --
>
> Key: KAFKA-12426
> URL: https://issues.apache.org/jira/browse/KAFKA-12426
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Justine Olshan
>Priority: Major
>  Labels: kip-500
>
> As part of KIP-516, the broker should create a partition.metadata file for 
> each partition in order to keep track of the topicId. This is done through 
> `Partition.checkOrSetTopicId`. We have the logic for this in 
> `ReplicaManager.becomeLeaderOrFollower` already, but we need to implement 
> analogous logic in `RaftReplicaManager.handleMetadataRecords`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

2021-03-12 Thread GitBox


ableegoldman commented on pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#issuecomment-797862761


   Merged to trunk and cherrypicked to 2.7 & 2.8 cc @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

2021-03-12 Thread GitBox


ableegoldman merged pull request #10311:
URL: https://github.com/apache/kafka/pull/10311


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

2021-03-12 Thread GitBox


ableegoldman commented on pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#issuecomment-797862288


   One unrelated test failure: 
`kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()`
   
   Going to merge



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10300: KAFKA-12452: Remove deprecated overloads of ProcessorContext#forward

2021-03-12 Thread GitBox


ableegoldman commented on a change in pull request #10300:
URL: https://github.com/apache/kafka/pull/10300#discussion_r593553762



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
##
@@ -155,29 +155,20 @@ public void shouldReturnEmptyHeadersIfHeadersAreNotSet() {
 assertThat(context.headers(), is(emptyIterable()));
 }
 
-@Test
-public void shouldThrowIllegalStateExceptionOnHeadersIfNoRecordContext() {

Review comment:
   Just want to make sure I understand the background here: as of KIP-478 
we no longer throw if there's no active RecordContext, but instead just return 
empty. Is that right? If so then yes, an updated test like 
`shouldReturnEmptyHeaderIfRecordContextIsNull` seems good to have





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming closed pull request #10189: MINOR: Update copyright year in NOTICE

2021-03-12 Thread GitBox


dengziming closed pull request #10189:
URL: https://github.com/apache/kafka/pull/10189


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

2021-03-12 Thread GitBox


dengziming commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r593541172



##
File path: clients/src/main/resources/common/message/VoteRequest.json
##
@@ -21,7 +21,7 @@
   "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
-{ "name": "ClusterId", "type": "string", "versions": "0+",
+{ "name": "ClusterId", "type": "string", "versions": "0+",  "ignorable": 
true,

Review comment:
   this was checked in by mistake when testing, I will revert it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #10312: MINOR: Fix log statement whose placeholders are inconsistent with arguments

2021-03-12 Thread GitBox


dengziming opened a new pull request #10312:
URL: https://github.com/apache/kafka/pull/10312


   *More detailed description of your change*
   1. When the 2nd argument is an exception we don't need a placeholder
   2. Placeholders should equal to arguments. 
   
   *Summary of testing strategy (including rationale)*
   QA
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-12 Thread GitBox


ableegoldman commented on pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#issuecomment-797844734


   Failed with unrelated 
`connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining()`
 and `kafka.server.ScramServerStartupTest.testAuthentications()`
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10300: KAFKA-12452: Remove deprecated overloads of ProcessorContext#forward

2021-03-12 Thread GitBox


mjsax commented on a change in pull request #10300:
URL: https://github.com/apache/kafka/pull/10300#discussion_r593539158



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
##
@@ -37,7 +37,6 @@
 
 public class ProcessorNode {
 
-// TODO: 'children' can be removed when #forward() via index is removed

Review comment:
   Cool. We can still change it later as it's only internal stuff if we 
feel the need.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10300: KAFKA-12452: Remove deprecated overloads of ProcessorContext#forward

2021-03-12 Thread GitBox


mjsax commented on a change in pull request #10300:
URL: https://github.com/apache/kafka/pull/10300#discussion_r593538979



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##
@@ -280,61 +280,6 @@ public void testDrivingSimpleTopology() {
 assertTrue(outputTopic1.isEmpty());
 }
 
-
-@Test
-public void testDrivingMultiplexingTopology() {

Review comment:
   It's using the "old API" topology provided by 
`createMultiplexingTopology` (and below `createMultiplexByNameTopology`), ie, 
`MultiplexByNameProcessor` and `MultiplexingProcessor`.
   
   I did double check and the tested multiplexing case is tested using the new 
API via `createMultiProcessorTimestampTopology` that is using the 
`FanOutTimestampProcessor` that is the "new" equivalent of the old (and 
removed) `MultiplexByNameProcessor` processor (for the multiplex by index we 
don't need a new test as we removed routing by index on purpose and it's not 
part of the new API any longer.)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10300: KAFKA-12452: Remove deprecated overloads of ProcessorContext#forward

2021-03-12 Thread GitBox


mjsax commented on a change in pull request #10300:
URL: https://github.com/apache/kafka/pull/10300#discussion_r593538020



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
##
@@ -155,29 +155,20 @@ public void shouldReturnEmptyHeadersIfHeadersAreNotSet() {
 assertThat(context.headers(), is(emptyIterable()));
 }
 
-@Test
-public void shouldThrowIllegalStateExceptionOnHeadersIfNoRecordContext() {

Review comment:
   @ableegoldman Follow up though: should we keep this test using reverted 
logic: `shouldReturnEmptyHeaderIfRecordContextIsNull` ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #10189: MINOR: Update copyright year in NOTICE

2021-03-12 Thread GitBox


dengziming commented on pull request #10189:
URL: https://github.com/apache/kafka/pull/10189#issuecomment-797842701


   close this since it has been resolved in #10308



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`

2021-03-12 Thread GitBox


dengziming commented on pull request #10243:
URL: https://github.com/apache/kafka/pull/10243#issuecomment-797842164


   > Thank for the update. Have you tried to repeatedly run the test to verify 
that it resolve the issue?
   
   @dajac Of course, I tried many times to verify, only when I set the 
waitTimeMs<6000 it fails occasionally. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`

2021-03-12 Thread GitBox


dengziming commented on a change in pull request #10243:
URL: https://github.com/apache/kafka/pull/10243#discussion_r593536795



##
File path: core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
##
@@ -476,14 +476,18 @@ class ConsumerBounceTest extends AbstractConsumerTest 
with Logging {
 // New instance of consumer should be assigned partitions immediately and 
should see committed offsets.
 val assignSemaphore = new Semaphore(0)
 val consumer = createConsumerWithGroupId(groupId)
-consumer.subscribe(Collections.singletonList(topic),  new 
ConsumerRebalanceListener {
+consumer.subscribe(Collections.singletonList(topic), new 
ConsumerRebalanceListener {
   def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = 
{
 assignSemaphore.release()
   }
   def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
   }})
-consumer.poll(time.Duration.ofSeconds(3L))
-assertTrue(assignSemaphore.tryAcquire(1, TimeUnit.SECONDS), "Assignment 
did not complete on time")
+
+TestUtils.waitUntilTrue(() => {
+  consumer.poll(time.Duration.ZERO)

Review comment:
   Done!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #10310: KAFKA-12460; Do not allow raft truncation below high watermark

2021-03-12 Thread GitBox


hachikuji merged pull request #10310:
URL: https://github.com/apache/kafka/pull/10310


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12460) Raft should prevent truncation below high watermark

2021-03-12 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12460.
-
Resolution: Fixed

> Raft should prevent truncation below high watermark
> ---
>
> Key: KAFKA-12460
> URL: https://issues.apache.org/jira/browse/KAFKA-12460
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Eventually we will have to come up with some approach to recover from 
> committed data loss in the raft quorum (something akin to unclean leader 
> election for normal partitions).  For now, we would rather be stricter and 
> fail fast rather than allowing committed data to be silently lost. 
> Specifically, we can prevent any attempt to truncate below the high watermark 
> since this is a clear indication of data loss. The long term thought I have 
> in mind is to give users an --unsafe flag or something like that which can be 
> passed at startup in order to knowingly turn off the stricter validation in 
> order to let the quorum recover from a disaster scenario. This needs some 
> more thought though.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

2021-03-12 Thread GitBox


wcarlson5 commented on pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#issuecomment-797829955


   LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

2021-03-12 Thread GitBox


ableegoldman commented on a change in pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#discussion_r593523423



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##
@@ -87,7 +87,9 @@ public void onPartitionsRevoked(final 
Collection partitions) {
   taskManager.activeTaskIds(),
   taskManager.standbyTaskIds());
 
-if (streamThread.setState(State.PARTITIONS_REVOKED) != null && 
!partitions.isEmpty()) {
+// We need to still invoke handleRevocation if the thread has been 
told to shut down, but we shouldn't ever
+// transition away from PENDING_SHUTDOWN once it's been initiated (to 
anything other than DEAD)
+if ((streamThread.setState(State.PARTITIONS_REVOKED) != null || 
streamThread.state() == State.PENDING_SHUTDOWN) && !partitions.isEmpty()) {

Review comment:
   I think this is the correct order (assuming you mean the order of 
`streamThread.setState(State.PARTITIONS_REVOKED) != null` relative to 
`streamThread.state() == State.PENDING_SHUTDOWN`?) -- if the thread is not in 
PENDING_SHUTDOWN when it reaches this line, the first condition should return 
true, which is what we want even if it does get transitioned to 
PENDING_SHUTDOWN immediately after the transition to PARTITIONS_REVOKED.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

2021-03-12 Thread GitBox


ableegoldman commented on a change in pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#discussion_r593522324



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -714,6 +714,13 @@ void runOnce() {
 
 final long pollLatency = pollPhase();
 
+// Optimization to skip the rest of the processing loop in case the 
thread was requested to shut down during
+// the poll phase

Review comment:
   臘‍♀️  Oh wow how did I not see that lol. I'll just bump the log to INFO





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-12 Thread GitBox


ableegoldman commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r593521930



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
 }
 }
 } else {
-requestRejoin();

Review comment:
   Hmm...but `resetStateAndRejoin(String.format("rebalance failed with 
retriable error %s", exception));` is only called in `joinGroupIfNeeded`  which 
is only called in `ensureActiveGroup`, which is in turn only invoked in 
`ConsumerCoordinator#poll`.
   That said,  inside `SyncGroupResponseHandler#handle` we would already have 
`rejoinNeeded = true` and only set it to false if the SyncGroup succeeds. So 
for that reason I guess we don't need the `requestRejoin` anywhere inside the 
SyncGroup handler





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

2021-03-12 Thread GitBox


wcarlson5 commented on a change in pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#discussion_r593517811



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##
@@ -87,7 +87,9 @@ public void onPartitionsRevoked(final 
Collection partitions) {
   taskManager.activeTaskIds(),
   taskManager.standbyTaskIds());
 
-if (streamThread.setState(State.PARTITIONS_REVOKED) != null && 
!partitions.isEmpty()) {
+// We need to still invoke handleRevocation if the thread has been 
told to shut down, but we shouldn't ever
+// transition away from PENDING_SHUTDOWN once it's been initiated (to 
anything other than DEAD)
+if ((streamThread.setState(State.PARTITIONS_REVOKED) != null || 
streamThread.state() == State.PENDING_SHUTDOWN) && !partitions.isEmpty()) {

Review comment:
   do we need to be concerned about the oder these execute?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -714,6 +714,13 @@ void runOnce() {
 
 final long pollLatency = pollPhase();
 
+// Optimization to skip the rest of the processing loop in case the 
thread was requested to shut down during
+// the poll phase

Review comment:
   Good idea but I think we do this a few lines down





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception

2021-03-12 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300654#comment-17300654
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12462:


The only real downside in 2.7 is that we won't properly clean up the task, ie 
we'll skip committing the offsets and writing the checkpoint. So we'd lose any 
work we did since the last commit – for EOS this would be a perf hit since we'd 
probably need to restore the state stores from scratch after starting back up, 
whereas for ALOS we could get some overcounting. Not the end of the world, but 
worth fixing if we can

> Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state 
> exception 
> --
>
> Key: KAFKA-12462
> URL: https://issues.apache.org/jira/browse/KAFKA-12462
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: streams
> Fix For: 2.8.0, 2.7.1
>
>
> A thread was removed, sending it to the PENDING_SHUTDOWN state, but went 
> through a rebalance before completing the shutdown.
> {code:java}
> // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] 
> stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to 
> transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a 
> valid next state (org.apache.kafka.streams.processor.internals.StreamThread)
> {code}
> Inside StreamsRebalanceListener#onPartitionsRevoked, we have
> {code:java}
> // 
> if (streamThread.setState(State.PARTITIONS_REVOKED) != null && 
> !partitions.isEmpty())
> taskManager.handleRevocation(partitions);
> {code}
> Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we 
> never invoke TaskManager#handleRevocation. Currently handleRevocation is 
> responsible for preparing any active tasks for close, including committing 
> offsets and writing the checkpoint as well as suspending the task. We can’t 
> close the task in handleRevocation since we still support EAGER rebalancing, 
> which invokes handleRevocation at the beginning of a rebalance on all tasks.
> The tasks that are actually revoked will be closed during 
> TaskManager#handleAssignment . The IllegalStateException is specifically 
> because we don’t suspend the task before attempting to close it, and the 
> direct transition from RUNNING → CLOSED is forbidden.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-12 Thread GitBox


guozhangwang commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r593515777



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
 }
 }
 } else {
-requestRejoin();

Review comment:
   I think we do not need to, since it would be called on 
`resetStateAndRejoin(String.format("rebalance failed with retriable error %s", 
exception));` --- previously we are calling rejoin double times.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

2021-03-12 Thread GitBox


ableegoldman commented on pull request #10311:
URL: https://github.com/apache/kafka/pull/10311#issuecomment-797820964


   @vvcephei @wcarlson5 @lct45 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN

2021-03-12 Thread GitBox


ableegoldman opened a new pull request #10311:
URL: https://github.com/apache/kafka/pull/10311


   Also check the state after the poll phase and exit the StreamThread 
processing loop early if the thread is in PENDING_SHUTDOWN



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception

2021-03-12 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-12462:
--

Assignee: A. Sophie Blee-Goldman

> Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state 
> exception 
> --
>
> Key: KAFKA-12462
> URL: https://issues.apache.org/jira/browse/KAFKA-12462
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: streams
> Fix For: 2.8.0
>
>
> A thread was removed, sending it to the PENDING_SHUTDOWN state, but went 
> through a rebalance before completing the shutdown.
> {code:java}
> // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] 
> stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to 
> transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a 
> valid next state (org.apache.kafka.streams.processor.internals.StreamThread)
> {code}
> Inside StreamsRebalanceListener#onPartitionsRevoked, we have
> {code:java}
> // 
> if (streamThread.setState(State.PARTITIONS_REVOKED) != null && 
> !partitions.isEmpty())
> taskManager.handleRevocation(partitions);
> {code}
> Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we 
> never invoke TaskManager#handleRevocation. Currently handleRevocation is 
> responsible for preparing any active tasks for close, including committing 
> offsets and writing the checkpoint as well as suspending the task. We can’t 
> close the task in handleRevocation since we still support EAGER rebalancing, 
> which invokes handleRevocation at the beginning of a rebalance on all tasks.
> The tasks that are actually revoked will be closed during 
> TaskManager#handleAssignment . The IllegalStateException is specifically 
> because we don’t suspend the task before attempting to close it, and the 
> direct transition from RUNNING → CLOSED is forbidden.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception

2021-03-12 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson updated KAFKA-12462:
---
Component/s: streams

> Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state 
> exception 
> --
>
> Key: KAFKA-12462
> URL: https://issues.apache.org/jira/browse/KAFKA-12462
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Priority: Blocker
>  Labels: streams
> Fix For: 2.8.0
>
>
> A thread was removed, sending it to the PENDING_SHUTDOWN state, but went 
> through a rebalance before completing the shutdown.
> {code:java}
> // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] 
> stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to 
> transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a 
> valid next state (org.apache.kafka.streams.processor.internals.StreamThread)
> {code}
> Inside StreamsRebalanceListener#onPartitionsRevoked, we have
> {code:java}
> // 
> if (streamThread.setState(State.PARTITIONS_REVOKED) != null && 
> !partitions.isEmpty())
> taskManager.handleRevocation(partitions);
> {code}
> Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we 
> never invoke TaskManager#handleRevocation. Currently handleRevocation is 
> responsible for preparing any active tasks for close, including committing 
> offsets and writing the checkpoint as well as suspending the task. We can’t 
> close the task in handleRevocation since we still support EAGER rebalancing, 
> which invokes handleRevocation at the beginning of a rebalance on all tasks.
> The tasks that are actually revoked will be closed during 
> TaskManager#handleAssignment . The IllegalStateException is specifically 
> because we don’t suspend the task before attempting to close it, and the 
> direct transition from RUNNING → CLOSED is forbidden.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception

2021-03-12 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson updated KAFKA-12462:
---
Affects Version/s: 2.8.0

> Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state 
> exception 
> --
>
> Key: KAFKA-12462
> URL: https://issues.apache.org/jira/browse/KAFKA-12462
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Priority: Blocker
> Fix For: 2.8.0
>
>
> A thread was removed, sending it to the PENDING_SHUTDOWN state, but went 
> through a rebalance before completing the shutdown.
> {code:java}
> // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] 
> stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to 
> transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a 
> valid next state (org.apache.kafka.streams.processor.internals.StreamThread)
> {code}
> Inside StreamsRebalanceListener#onPartitionsRevoked, we have
> {code:java}
> // 
> if (streamThread.setState(State.PARTITIONS_REVOKED) != null && 
> !partitions.isEmpty())
> taskManager.handleRevocation(partitions);
> {code}
> Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we 
> never invoke TaskManager#handleRevocation. Currently handleRevocation is 
> responsible for preparing any active tasks for close, including committing 
> offsets and writing the checkpoint as well as suspending the task. We can’t 
> close the task in handleRevocation since we still support EAGER rebalancing, 
> which invokes handleRevocation at the beginning of a rebalance on all tasks.
> The tasks that are actually revoked will be closed during 
> TaskManager#handleAssignment . The IllegalStateException is specifically 
> because we don’t suspend the task before attempting to close it, and the 
> direct transition from RUNNING → CLOSED is forbidden.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception

2021-03-12 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson updated KAFKA-12462:
---
Labels: streams  (was: )

> Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state 
> exception 
> --
>
> Key: KAFKA-12462
> URL: https://issues.apache.org/jira/browse/KAFKA-12462
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Priority: Blocker
>  Labels: streams
> Fix For: 2.8.0
>
>
> A thread was removed, sending it to the PENDING_SHUTDOWN state, but went 
> through a rebalance before completing the shutdown.
> {code:java}
> // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] 
> stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to 
> transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a 
> valid next state (org.apache.kafka.streams.processor.internals.StreamThread)
> {code}
> Inside StreamsRebalanceListener#onPartitionsRevoked, we have
> {code:java}
> // 
> if (streamThread.setState(State.PARTITIONS_REVOKED) != null && 
> !partitions.isEmpty())
> taskManager.handleRevocation(partitions);
> {code}
> Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we 
> never invoke TaskManager#handleRevocation. Currently handleRevocation is 
> responsible for preparing any active tasks for close, including committing 
> offsets and writing the checkpoint as well as suspending the task. We can’t 
> close the task in handleRevocation since we still support EAGER rebalancing, 
> which invokes handleRevocation at the beginning of a rebalance on all tasks.
> The tasks that are actually revoked will be closed during 
> TaskManager#handleAssignment . The IllegalStateException is specifically 
> because we don’t suspend the task before attempting to close it, and the 
> direct transition from RUNNING → CLOSED is forbidden.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception

2021-03-12 Thread Walker Carlson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300652#comment-17300652
 ] 

Walker Carlson commented on KAFKA-12462:


If we were shutting down the whole client the thread would become dead either 
way. In 2.7 I think the only impact it would have is that the handler would get 
called after the close call when it shouldn’t. But otherwise it might not have 
an effect. I suppose there is no harm to back-porting though.

I defiantly don't think it is worth cutting a new RC for anything that does not 
have removeThread in it

 

> Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state 
> exception 
> --
>
> Key: KAFKA-12462
> URL: https://issues.apache.org/jira/browse/KAFKA-12462
> Project: Kafka
>  Issue Type: Bug
>Reporter: Walker Carlson
>Priority: Blocker
> Fix For: 2.8.0
>
>
> A thread was removed, sending it to the PENDING_SHUTDOWN state, but went 
> through a rebalance before completing the shutdown.
> {code:java}
> // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] 
> stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to 
> transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a 
> valid next state (org.apache.kafka.streams.processor.internals.StreamThread)
> {code}
> Inside StreamsRebalanceListener#onPartitionsRevoked, we have
> {code:java}
> // 
> if (streamThread.setState(State.PARTITIONS_REVOKED) != null && 
> !partitions.isEmpty())
> taskManager.handleRevocation(partitions);
> {code}
> Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we 
> never invoke TaskManager#handleRevocation. Currently handleRevocation is 
> responsible for preparing any active tasks for close, including committing 
> offsets and writing the checkpoint as well as suspending the task. We can’t 
> close the task in handleRevocation since we still support EAGER rebalancing, 
> which invokes handleRevocation at the beginning of a rebalance on all tasks.
> The tasks that are actually revoked will be closed during 
> TaskManager#handleAssignment . The IllegalStateException is specifically 
> because we don’t suspend the task before attempting to close it, and the 
> direct transition from RUNNING → CLOSED is forbidden.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-12 Thread GitBox


ableegoldman commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r593510070



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
 }
 }
 } else {
-requestRejoin();

Review comment:
   Ok cool, thanks. One last question then: after this refactoring, since 
we no longer call `requestRejoinOnResponseError` below, should we re-add the 
`requestRejoin()` call here? Or add a `requestRejoin` to the specific cases in 
the SyncGroup handler, eg
   ```
   } else if (error == Errors.REBALANCE_IN_PROGRESS) {
   log.info("SyncGroup failed: The group began another rebalance. Need to 
re-join the group. " +
 "Sent generation was {}", sentGeneration);
   future.raise(error);
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] levzem commented on pull request #10299: KAFKA-10070: parameterize Connect unit tests to remove code duplication

2021-03-12 Thread GitBox


levzem commented on pull request #10299:
URL: https://github.com/apache/kafka/pull/10299#issuecomment-797816232


   tests failing are unrelated to the changes
   
   `Build / JDK 11 / 
kafka.api.PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup()`
   `Build / JDK 15 / 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining`
   
   @mimaison thanks for the review, mind taking another pass?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception

2021-03-12 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300650#comment-17300650
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12462:


Thanks Walker! This actually seems like a long-lurking bug that was just 
surfaced by the removeStreamThread() feature, not caused by it. Before we could 
remove threads this was only possible when shutting down the client, which we 
don’t test as frequently as we now do removeStreamThread(). It’s also hard to 
notice that a bug has caused thread(s) to die when the threads were supposed to 
shut down anyways. But now we might only be removing one thread, and thanks to 
the new exception handler we’ll shut down the whole application upon hitting 
this so the thread won’t just quietly die.

We should consider backporting the fix to 2.7, even though the bug isn't going 
to be as frequent or as bad in earlier versions. I wouldn't cut a new RC for 
2.6.2 over this, but we might as well backport to get the fix in 2.7.1 whenever 
that comes out

> Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state 
> exception 
> --
>
> Key: KAFKA-12462
> URL: https://issues.apache.org/jira/browse/KAFKA-12462
> Project: Kafka
>  Issue Type: Bug
>Reporter: Walker Carlson
>Priority: Blocker
> Fix For: 2.8.0
>
>
> A thread was removed, sending it to the PENDING_SHUTDOWN state, but went 
> through a rebalance before completing the shutdown.
> {code:java}
> // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] 
> stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to 
> transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a 
> valid next state (org.apache.kafka.streams.processor.internals.StreamThread)
> {code}
> Inside StreamsRebalanceListener#onPartitionsRevoked, we have
> {code:java}
> // 
> if (streamThread.setState(State.PARTITIONS_REVOKED) != null && 
> !partitions.isEmpty())
> taskManager.handleRevocation(partitions);
> {code}
> Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we 
> never invoke TaskManager#handleRevocation. Currently handleRevocation is 
> responsible for preparing any active tasks for close, including committing 
> offsets and writing the checkpoint as well as suspending the task. We can’t 
> close the task in handleRevocation since we still support EAGER rebalancing, 
> which invokes handleRevocation at the beginning of a rebalance on all tasks.
> The tasks that are actually revoked will be closed during 
> TaskManager#handleAssignment . The IllegalStateException is specifically 
> because we don’t suspend the task before attempting to close it, and the 
> direct transition from RUNNING → CLOSED is forbidden.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

2021-03-12 Thread GitBox


hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r593509351



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -247,7 +267,7 @@ private boolean isVoter(int remoteNodeId) {
 return voterReplicaStates.containsKey(remoteNodeId);
 }
 
-private static class ReplicaState implements Comparable {
+private static abstract class ReplicaState implements 
Comparable {

Review comment:
   Good point.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception

2021-03-12 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-12462:
-
Fix Version/s: 2.8.0

> Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state 
> exception 
> --
>
> Key: KAFKA-12462
> URL: https://issues.apache.org/jira/browse/KAFKA-12462
> Project: Kafka
>  Issue Type: Bug
>Reporter: Walker Carlson
>Priority: Blocker
> Fix For: 2.8.0
>
>
> A thread was removed, sending it to the PENDING_SHUTDOWN state, but went 
> through a rebalance before completing the shutdown.
> {code:java}
> // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] 
> stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to 
> transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a 
> valid next state (org.apache.kafka.streams.processor.internals.StreamThread)
> {code}
> Inside StreamsRebalanceListener#onPartitionsRevoked, we have
> {code:java}
> // 
> if (streamThread.setState(State.PARTITIONS_REVOKED) != null && 
> !partitions.isEmpty())
> taskManager.handleRevocation(partitions);
> {code}
> Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we 
> never invoke TaskManager#handleRevocation. Currently handleRevocation is 
> responsible for preparing any active tasks for close, including committing 
> offsets and writing the checkpoint as well as suspending the task. We can’t 
> close the task in handleRevocation since we still support EAGER rebalancing, 
> which invokes handleRevocation at the beginning of a rebalance on all tasks.
> The tasks that are actually revoked will be closed during 
> TaskManager#handleAssignment . The IllegalStateException is specifically 
> because we don’t suspend the task before attempting to close it, and the 
> direct transition from RUNNING → CLOSED is forbidden.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception

2021-03-12 Thread Walker Carlson (Jira)
Walker Carlson created KAFKA-12462:
--

 Summary: Threads in PENDING_SHUTDOWN entering a rebalance can 
cause an illegal state exception 
 Key: KAFKA-12462
 URL: https://issues.apache.org/jira/browse/KAFKA-12462
 Project: Kafka
  Issue Type: Bug
Reporter: Walker Carlson


A thread was removed, sending it to the PENDING_SHUTDOWN state, but went 
through a rebalance before completing the shutdown.
{code:java}
// [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] 
stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to transit 
from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a valid next 
state (org.apache.kafka.streams.processor.internals.StreamThread)
{code}
Inside StreamsRebalanceListener#onPartitionsRevoked, we have
{code:java}
// 
if (streamThread.setState(State.PARTITIONS_REVOKED) != null && 
!partitions.isEmpty())
taskManager.handleRevocation(partitions);
{code}
Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we 
never invoke TaskManager#handleRevocation. Currently handleRevocation is 
responsible for preparing any active tasks for close, including committing 
offsets and writing the checkpoint as well as suspending the task. We can’t 
close the task in handleRevocation since we still support EAGER rebalancing, 
which invokes handleRevocation at the beginning of a rebalance on all tasks.

The tasks that are actually revoked will be closed during 
TaskManager#handleAssignment . The IllegalStateException is specifically 
because we don’t suspend the task before attempting to close it, and the direct 
transition from RUNNING → CLOSED is forbidden.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`

2021-03-12 Thread GitBox


dajac commented on a change in pull request #10243:
URL: https://github.com/apache/kafka/pull/10243#discussion_r593505670



##
File path: core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
##
@@ -476,14 +476,18 @@ class ConsumerBounceTest extends AbstractConsumerTest 
with Logging {
 // New instance of consumer should be assigned partitions immediately and 
should see committed offsets.
 val assignSemaphore = new Semaphore(0)
 val consumer = createConsumerWithGroupId(groupId)
-consumer.subscribe(Collections.singletonList(topic),  new 
ConsumerRebalanceListener {
+consumer.subscribe(Collections.singletonList(topic), new 
ConsumerRebalanceListener {
   def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = 
{
 assignSemaphore.release()
   }
   def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
   }})
-consumer.poll(time.Duration.ofSeconds(3L))
-assertTrue(assignSemaphore.tryAcquire(1, TimeUnit.SECONDS), "Assignment 
did not complete on time")
+
+TestUtils.waitUntilTrue(() => {
+  consumer.poll(time.Duration.ZERO)

Review comment:
   Should we keep a small poll timeout here? Something like 100ms?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12460) Raft should prevent truncation below high watermark

2021-03-12 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12460:
--
Description: Eventually we will have to come up with some approach to 
recover from committed data loss in the raft quorum (something akin to unclean 
leader election for normal partitions).  For now, we would rather be stricter 
and fail fast rather than allowing committed data to be silently lost. 
Specifically, we can prevent any attempt to truncate below the high watermark 
since this is a clear indication of data loss. The long term thought I have in 
mind is to give users an --unsafe flag or something like that which can be 
passed at startup in order to knowingly turn off the stricter validation in 
order to let the quorum recover from a disaster scenario. This needs some more 
thought though.  (was: Eventually we will have to come up with some approach to 
recover from committed data loss in the raft quorum (something akin to unclean 
leader election for normal partitions).  For now, we would rather be stricter 
and fail fast rather than allowing committed data to be silently lost. 
Specifically, we can prevent any attempt to truncate below the high watermark 
since this is a clear indication of data loss. The long term thought I have in 
mind is to give users an --unsafe flag or something like that which can be 
passed at startup in order to knowingly turn of the stricter validation in 
order to let the quorum recover from a disaster scenario. This needs some more 
thought though.)

> Raft should prevent truncation below high watermark
> ---
>
> Key: KAFKA-12460
> URL: https://issues.apache.org/jira/browse/KAFKA-12460
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Eventually we will have to come up with some approach to recover from 
> committed data loss in the raft quorum (something akin to unclean leader 
> election for normal partitions).  For now, we would rather be stricter and 
> fail fast rather than allowing committed data to be silently lost. 
> Specifically, we can prevent any attempt to truncate below the high watermark 
> since this is a clear indication of data loss. The long term thought I have 
> in mind is to give users an --unsafe flag or something like that which can be 
> passed at startup in order to knowingly turn off the stricter validation in 
> order to let the quorum recover from a disaster scenario. This needs some 
> more thought though.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-12 Thread GitBox


guozhangwang commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r593504141



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
 }
 }
 } else {
-requestRejoin();

Review comment:
   I added that function for sync group handler that handles retriable 
`COORDINATOR_NOT_AVAILABLE / NOT_COORDINATOR` and any unexpected error. After 
the refactoring PR they are not all fall into the `joinGroupIfNeeded` in
   
   ```
   final RuntimeException exception = future.exception();
   
   resetJoinGroupFuture();
   
   if (exception instanceof UnknownMemberIdException ||
   exception instanceof IllegalGenerationException ||
   exception instanceof RebalanceInProgressException ||
   exception instanceof MemberIdRequiredException)
   continue;
   else if (!future.isRetriable())
   throw exception;
   
   resetStateAndRejoin(String.format("rebalance failed with 
retriable error %s", exception));
   timer.sleep(rebalanceConfig.retryBackoffMs);
   ```
   
   This is part of the principle I mentioned:
   
   ```
   We may reset generationa and request rejoin in two different places: 1) in 
join/sync-group handler, and 2) in joinGroupIfNeeded, when the future is 
received. The principle is that these two should not overlap, and 2) is used as 
a fallback for those common errors from join/sync that we do not handle 
specifically.
   ```
   
   But I forgot to remove this function as part of the second pass; will remove.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-12 Thread GitBox


dajac commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r593503408



##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -40,9 +40,16 @@ object LogDirsCommand {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
 val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
+val clusterBrokers: Array[Int] = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
 val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
 case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+case None => clusterBrokers
+}
+
+val nonExistBrokers: Array[Int] = brokerList.filterNot(brokerId => 
clusterBrokers.contains(brokerId))
+if (!nonExistBrokers.isEmpty) {
+  System.err.println(s"The given node(s) does not exist from 
broker-list ${nonExistBrokers.mkString(",")}")
+  sys.exit(1)

Review comment:
   Should we do this only when handle the brokers provided by the user? It 
does not make sense to validate the list of brokers otherwise. What do you 
think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

2021-03-12 Thread GitBox


jsancio commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r593492361



##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -247,7 +267,7 @@ private boolean isVoter(int remoteNodeId) {
 return voterReplicaStates.containsKey(remoteNodeId);
 }
 
-private static class ReplicaState implements Comparable {
+private static abstract class ReplicaState implements 
Comparable {

Review comment:
   Do we really need to distinguish between `VoterState` and 
`ObserverState`? For example, the only different is `hasAcknowledgedLeader`. I 
would argue that we could just move this field to `ReplicateState` and say that 
observers will have this value always false or the value is ignored.
   
   I am leaning towards just updating the value irrespective of if it is a 
voter or observer. This is probably useful to have it when we implement quorum 
reassignment. We can document whatever semantic you decide as a comment for 
this type.

##
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##
@@ -140,6 +160,22 @@ public void testUpdateHighWatermarkQuorumSizeThree() {
 assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
 }
 
+@Test
+public void testNonMonotonicHighWatermarkUpdate() {
+MockTime time = new MockTime();
+int node1 = 1;
+LeaderState state = newLeaderState(mkSet(localId, node1), 0L);
+state.updateLocalState(time.milliseconds(), new 
LogOffsetMetadata(10L));
+state.updateReplicaState(node1, time.milliseconds(), new 
LogOffsetMetadata(10L));
+assertEquals(Optional.of(new LogOffsetMetadata(10L)), 
state.highWatermark());
+
+// Follower crashes and disk is lost. It fetches an earlier offset to 
rebuild state.
+state.updateReplicaState(node1, time.milliseconds(), new 
LogOffsetMetadata(5L));

Review comment:
   Let's check that this calls returns `false`.
   
   Let's also add a test that calls `getVoterEndOffsets` and checks the 
returned map is correct.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12461) Extend LogManager to cover the metadata topic

2021-03-12 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12461:
---

 Summary: Extend LogManager to cover the metadata topic
 Key: KAFKA-12461
 URL: https://issues.apache.org/jira/browse/KAFKA-12461
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson


The `@metadata` topic is not managed by `LogManager` since it uses a new 
snapshot-based retention policy. This means that it is not covered by the 
recovery and high watermark checkpoints. It would be useful to fix this. We can 
either extend `LogManager` so that it is aware of the snapshotting semantics 
implemented by the `@metadata` topic, or we can create something like a 
`RaftLogManager`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager

2021-03-12 Thread GitBox


abbccdda commented on a change in pull request #10135:
URL: https://github.com/apache/kafka/pull/10135#discussion_r593492752



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -179,17 +181,16 @@ class BrokerServer(
   val controllerNodes = 
RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
   val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, 
config, controllerNodes)
 
-  val forwardingChannelManager = BrokerToControllerChannelManager(
+  clientToControllerChannelManager = BrokerToControllerChannelManager(
 controllerNodeProvider,
 time,
 metrics,
 config,
-channelName = "forwarding",
+channelName = "clientToControllerChannel",

Review comment:
   Sg!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #10310: KAFKA-12460; Do not allow raft truncation below high watermark

2021-03-12 Thread GitBox


hachikuji opened a new pull request #10310:
URL: https://github.com/apache/kafka/pull/10310


   Initially we want to be strict about the loss of committed data for the 
`@metadata` topic. This patch ensures that truncation below the high watermark 
is not allowed. Note that `MockLog` already had the logic to do so, so the 
patch adds a similar check to `KafkaMetadataLog`. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager

2021-03-12 Thread GitBox


hachikuji commented on a change in pull request #10135:
URL: https://github.com/apache/kafka/pull/10135#discussion_r593485718



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -179,17 +181,16 @@ class BrokerServer(
   val controllerNodes = 
RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
   val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, 
config, controllerNodes)
 
-  val forwardingChannelManager = BrokerToControllerChannelManager(
+  clientToControllerChannelManager = BrokerToControllerChannelManager(
 controllerNodeProvider,
 time,
 metrics,
 config,
-channelName = "forwarding",
+channelName = "clientToControllerChannel",

Review comment:
   How about "controllerForwardingChannel"? I think it fits for both cases 
we're handling.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12460) Raft should prevent truncation below high watermark

2021-03-12 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12460:

Description: Eventually we will have to come up with some approach to 
recover from committed data loss in the raft quorum (something akin to unclean 
leader election for normal partitions).  For now, we would rather be stricter 
and fail fast rather than allowing committed data to be silently lost. 
Specifically, we can prevent any attempt to truncate below the high watermark 
since this is a clear indication of data loss. The long term thought I have in 
mind is to give users an --unsafe flag or something like that which can be 
passed at startup in order to knowingly turn of the stricter validation in 
order to let the quorum recover from a disaster scenario. This needs some more 
thought though.  (was: Eventually we will have to come up with some approach to 
recover from committed data loss in the raft quorum (something akin to unclean 
leader election for normal partitions).  For now, we would rather be stricter 
and fail fast rather than allowing committed data to be silently lost. The long 
term thought I have in mind is to give users an --unsafe flag or something like 
that which can be passed at startup in order to knowingly turn of the stricter 
validation in order to let the quorum recover from a disaster scenario. This 
needs some more thought though.)

> Raft should prevent truncation below high watermark
> ---
>
> Key: KAFKA-12460
> URL: https://issues.apache.org/jira/browse/KAFKA-12460
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Eventually we will have to come up with some approach to recover from 
> committed data loss in the raft quorum (something akin to unclean leader 
> election for normal partitions).  For now, we would rather be stricter and 
> fail fast rather than allowing committed data to be silently lost. 
> Specifically, we can prevent any attempt to truncate below the high watermark 
> since this is a clear indication of data loss. The long term thought I have 
> in mind is to give users an --unsafe flag or something like that which can be 
> passed at startup in order to knowingly turn of the stricter validation in 
> order to let the quorum recover from a disaster scenario. This needs some 
> more thought though.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12460) Raft should not prevent truncation below high watermark

2021-03-12 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12460:
---

 Summary: Raft should not prevent truncation below high watermark
 Key: KAFKA-12460
 URL: https://issues.apache.org/jira/browse/KAFKA-12460
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Eventually we will have to come up with some approach to recover from committed 
data loss in the raft quorum (something akin to unclean leader election for 
normal partitions).  For now, we would rather be stricter and fail fast rather 
than allowing committed data to be silently lost. The long term thought I have 
in mind is to give users an --unsafe flag or something like that which can be 
passed at startup in order to knowingly turn of the stricter validation in 
order to let the quorum recover from a disaster scenario. This needs some more 
thought though.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12460) Raft should prevent truncation below high watermark

2021-03-12 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12460:

Summary: Raft should prevent truncation below high watermark  (was: Raft 
should not prevent truncation below high watermark)

> Raft should prevent truncation below high watermark
> ---
>
> Key: KAFKA-12460
> URL: https://issues.apache.org/jira/browse/KAFKA-12460
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Eventually we will have to come up with some approach to recover from 
> committed data loss in the raft quorum (something akin to unclean leader 
> election for normal partitions).  For now, we would rather be stricter and 
> fail fast rather than allowing committed data to be silently lost. The long 
> term thought I have in mind is to give users an --unsafe flag or something 
> like that which can be passed at startup in order to knowingly turn of the 
> stricter validation in order to let the quorum recover from a disaster 
> scenario. This needs some more thought though.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji opened a new pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

2021-03-12 Thread GitBox


hachikuji opened a new pull request #10309:
URL: https://github.com/apache/kafka/pull/10309


   Currently the Raft leader raises an exception if there is a non-monotonic 
update to the fetch offset of a replica. In a situation where the replica had 
lost it disk state, this would prevent the replica from being able to recover. 
In this patch, we relax the validation to address this problem. It is worth 
pointing out that this validation could not be relied on to protect from data 
loss after a voter has lost committed state.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10300: KAFKA-12452: Remove deprecated overloads of ProcessorContext#forward

2021-03-12 Thread GitBox


ableegoldman commented on a change in pull request #10300:
URL: https://github.com/apache/kafka/pull/10300#discussion_r593468505



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
##
@@ -37,7 +37,6 @@
 
 public class ProcessorNode {
 
-// TODO: 'children' can be removed when #forward() via index is removed

Review comment:
   Fine with me





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10300: KAFKA-12452: Remove deprecated overloads of ProcessorContext#forward

2021-03-12 Thread GitBox


ableegoldman commented on a change in pull request #10300:
URL: https://github.com/apache/kafka/pull/10300#discussion_r593468044



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##
@@ -280,61 +280,6 @@ public void testDrivingSimpleTopology() {
 assertTrue(outputTopic1.isEmpty());
 }
 
-
-@Test
-public void testDrivingMultiplexingTopology() {

Review comment:
   Just wondering, why do we remove this test (and the below)?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-12 Thread GitBox


ableegoldman commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r593463366



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
 }
 }
 } else {
-requestRejoin();

Review comment:
   @guozhangwang I think something may have been messed up during a 
merge/rebase: I no longer see `requestRejoinOnResponseError` being invoked 
anywhere





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12426) Missing logic to create partition.metadata files in RaftReplicaManager

2021-03-12 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300616#comment-17300616
 ] 

Justine Olshan edited comment on KAFKA-12426 at 3/12/21, 9:26 PM:
--

While working on this bug, I caught another small bug in ReplicaManager due to 
changing the position of the topic ID check (to prevent handling a request for 
an inconsistent ID).

I realized that on the first LISR received on a newly created topic, the log 
will not yet be created when `checkOrSetTopicId` is called. This means the log 
will not store the topic ID until the second LISR request. This bug means that 
we lose the benefits of topic IDs on this first LISR pass, but we don't 
actively break things more than LISR pre-topic IDs.

A similar event will occur when restarting brokers. Logs will not yet be 
associated with the Partition object, so we won't be able to check the topic ID 
on the first LISR pass. This means the partition.metadata file is unable to 
serve its intended purpose in this case, so I think this case is a little worse.

Both cases are not worse than the pre-topic ID behavior we had before.



There is a pretty simple fix to this which I think I can include in this PR 
 [~ijuma] [~vvcephei] do you think this is a blocker for 2.8?


was (Author: jolshan):
While working on this bug, I caught another small bug in ReplicaManager due to 
changing the position of the topic ID check (to prevent handling a request for 
an inconsistent ID).

 I realized that on the first LISR received on a newly created topic, the log 
will not yet be created when `checkOrSetTopicId` is called. This means the log 
will not store the topic ID until the second LISR request. This bug means that 
we lose the benefits of topic IDs on this first LISR pass, but we don't 
actively break things more than LISR pre-topic IDs.

A similar event will occur when restarting brokers. Logs will not yet be 
associated with the Partition object, so we won't be able to check the topic ID 
on the first LISR pass. This means the partition.metadata file is unable to 
serve its intended purpose in this case, so I think this case is a little worse.

There is a pretty simple fix to this which I think I can include in this PR 
[~ijuma] [~vvcephei] do you think this is a blocker for 2.8?

> Missing logic to create partition.metadata files in RaftReplicaManager
> --
>
> Key: KAFKA-12426
> URL: https://issues.apache.org/jira/browse/KAFKA-12426
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Justine Olshan
>Priority: Major
>  Labels: kip-500
>
> As part of KIP-516, the broker should create a partition.metadata file for 
> each partition in order to keep track of the topicId. This is done through 
> `Partition.checkOrSetTopicId`. We have the logic for this in 
> `ReplicaManager.becomeLeaderOrFollower` already, but we need to implement 
> analogous logic in `RaftReplicaManager.handleMetadataRecords`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12426) Missing logic to create partition.metadata files in RaftReplicaManager

2021-03-12 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300616#comment-17300616
 ] 

Justine Olshan commented on KAFKA-12426:


While working on this bug, I caught another small bug in ReplicaManager due to 
changing the position of the topic ID check (to prevent handling a request for 
an inconsistent ID).

 I realized that on the first LISR received on a newly created topic, the log 
will not yet be created when `checkOrSetTopicId` is called. This means the log 
will not store the topic ID until the second LISR request. This bug means that 
we lose the benefits of topic IDs on this first LISR pass, but we don't 
actively break things more than LISR pre-topic IDs.

A similar event will occur when restarting brokers. Logs will not yet be 
associated with the Partition object, so we won't be able to check the topic ID 
on the first LISR pass. This means the partition.metadata file is unable to 
serve its intended purpose in this case, so I think this case is a little worse.

There is a pretty simple fix to this which I think I can include in this PR 
[~ijuma] [~vvcephei] do you think this is a blocker for 2.8?

> Missing logic to create partition.metadata files in RaftReplicaManager
> --
>
> Key: KAFKA-12426
> URL: https://issues.apache.org/jira/browse/KAFKA-12426
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Justine Olshan
>Priority: Major
>  Labels: kip-500
>
> As part of KIP-516, the broker should create a partition.metadata file for 
> each partition in order to keep track of the topicId. This is done through 
> `Partition.checkOrSetTopicId`. We have the logic for this in 
> `ReplicaManager.becomeLeaderOrFollower` already, but we need to implement 
> analogous logic in `RaftReplicaManager.handleMetadataRecords`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma merged pull request #10307: MINOR: Log project, gradle, java and scala versions at the start of the build

2021-03-12 Thread GitBox


ijuma merged pull request #10307:
URL: https://github.com/apache/kafka/pull/10307


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10278: KAFKA-10526: leader fsync deferral on write

2021-03-12 Thread GitBox


jsancio commented on a change in pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#discussion_r593419637



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1876,7 +1876,7 @@ private long maybeAppendBatches(
 BatchAccumulator.CompletedBatch batch = iterator.next();
 appendBatch(state, batch, currentTimeMs);
 }
-flushLeaderLog(state, currentTimeMs);
+//flushLeaderLog(state, currentTimeMs);

Review comment:
   The invariant that the leader most satisfy is that the `highWatermark <= 
flushOffset`. The current implementation satisfies this by flushing after every 
append and implicitly defining `flushOffset == logEndOffset`. At a high-level, 
I think the goals is to allow `highWatermark <= flushOffset <= logEndOffset`.
   
   On the follower, things are a little different. On the follower the 
`flushOffset == logEndOffset` before a `Fetch` request can be sent. This is 
because the leader assumes that the fetch offset in the `Fetch` request is the 
offset that the follower has successfully replicated.
   
   The advantage of appending without flushing as soon as possible replication 
latency. The leader cannot replicate record batches to the followers and 
observers until they have been appended to the log.
   
   I am not exactly sure how exactly we want to implement this since I haven't 
looked at the details but I think you are correct that on the leader side of 
things we want to increase the `flushOffset` in the `Fetch` request handling 
code as the leader attempts to increase the high-watermark. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

2021-03-12 Thread GitBox


jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593362414



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the 
topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the 
log.
* If a valid topic ID is provided, and the log exists but has no ID set, 
set the log ID to be the request ID.
*
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from 
the metadata records
+   * @return true if the received topic id is consistent, false otherwise
*/
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-// If the request had an invalid topic ID, then we assume that topic IDs 
are not supported.
-// The topic ID was not inconsistent, so return true.
-// If the log is empty, then we can not say that topic ID is inconsistent, 
so return true.
-if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-  true
-else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
   Hmmm I'm not a huge fan of this either since it also seems to be waiting 
to set the ID and it adds complexity to recovery. I'll take another look. Maybe 
I'm missing something.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

2021-03-12 Thread GitBox


jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593351308



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the 
topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the 
log.
* If a valid topic ID is provided, and the log exists but has no ID set, 
set the log ID to be the request ID.
*
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from 
the metadata records
+   * @return true if the received topic id is consistent, false otherwise
*/
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-// If the request had an invalid topic ID, then we assume that topic IDs 
are not supported.
-// The topic ID was not inconsistent, so return true.
-// If the log is empty, then we can not say that topic ID is inconsistent, 
so return true.
-if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-  true
-else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
   Hmm this is a little tricky. One idea I had would be to check or set the 
ID in the partition object only (not the file or in Log memory) then when the 
the partitions are passed in to `makeLeaders/Followers` and a new log is 
created, we will have the topic ID easily accessible to create the 
partition.metadata file. It kind of changes how we do things. We would either 
store in partition **_instead of_** or **_in addition to_**  Log. Not sure if 
we also want to change this for the ZK code. Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

2021-03-12 Thread GitBox


jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593351308



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the 
topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the 
log.
* If a valid topic ID is provided, and the log exists but has no ID set, 
set the log ID to be the request ID.
*
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from 
the metadata records
+   * @return true if the received topic id is consistent, false otherwise
*/
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-// If the request had an invalid topic ID, then we assume that topic IDs 
are not supported.
-// The topic ID was not inconsistent, so return true.
-// If the log is empty, then we can not say that topic ID is inconsistent, 
so return true.
-if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-  true
-else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
   Hmm this is a little tricky. One idea I had would be to set the ID in 
the partition object only (not the file or in Log memory) then when the the 
partitions are passed in to `makeLeaders/Followers` and a new log is created, 
we will have the topic ID easily accessible to create the partition.metadata 
file. It kind of changes how we do things. We would either store in partition 
**_instead of_** or **_in addition to_**  Log. Not sure if we also want to 
change this for the ZK code. Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

2021-03-12 Thread GitBox


jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593351308



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the 
topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the 
log.
* If a valid topic ID is provided, and the log exists but has no ID set, 
set the log ID to be the request ID.
*
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from 
the metadata records
+   * @return true if the received topic id is consistent, false otherwise
*/
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-// If the request had an invalid topic ID, then we assume that topic IDs 
are not supported.
-// The topic ID was not inconsistent, so return true.
-// If the log is empty, then we can not say that topic ID is inconsistent, 
so return true.
-if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-  true
-else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
   Hmm this is a little tricky. One idea I had would be to check or set the 
ID in the partition object only (not the file or in Log memory) then when the 
the partitions are passed in to `makeLeaders/Followers` and a new log is 
created, we will have the topic ID easily accessible to create the 
partition.metadata file. It kind of changes how we do things. We would either 
store in partition **_instead of_** or **_in addition to_**  Log. Not sure if 
we also want to change this for the ZK code. Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

2021-03-12 Thread GitBox


jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593351308



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the 
topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the 
log.
* If a valid topic ID is provided, and the log exists but has no ID set, 
set the log ID to be the request ID.
*
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from 
the metadata records
+   * @return true if the received topic id is consistent, false otherwise
*/
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-// If the request had an invalid topic ID, then we assume that topic IDs 
are not supported.
-// The topic ID was not inconsistent, so return true.
-// If the log is empty, then we can not say that topic ID is inconsistent, 
so return true.
-if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-  true
-else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
   Hmm this is a little tricky. One idea I had would be to check or set the 
ID in the partition object only (not the file) then when the the partitions are 
passed in to `makeLeaders/Followers` and a new log is created, we will have the 
topic ID easily accessible to create the partition.metadata file. It kind of 
changes how we do things. We would either store in partition **_instead of_** 
or **_in addition to_**  Log. Not sure if we also want to change this for the 
ZK code. Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

2021-03-12 Thread GitBox


jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593351308



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the 
topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the 
log.
* If a valid topic ID is provided, and the log exists but has no ID set, 
set the log ID to be the request ID.
*
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from 
the metadata records
+   * @return true if the received topic id is consistent, false otherwise
*/
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-// If the request had an invalid topic ID, then we assume that topic IDs 
are not supported.
-// The topic ID was not inconsistent, so return true.
-// If the log is empty, then we can not say that topic ID is inconsistent, 
so return true.
-if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-  true
-else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
   Hmm this is a little tricky. One idea I had would be to check or set the 
ID in the partition object only (not the file) then when the the partitions are 
passed in to `makeLeaders/Followers` and a new log is created, we will have the 
topic ID easily accessible to create the partition.metadata file. It kind of 
changes how we do things. We would either store in partition _instead of_ or 
_in addition to_  Log. Not sure if we also want to change this for the ZK code. 
Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

2021-03-12 Thread GitBox


jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593330038



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the 
topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the 
log.
* If a valid topic ID is provided, and the log exists but has no ID set, 
set the log ID to be the request ID.
*
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from 
the metadata records
+   * @return true if the received topic id is consistent, false otherwise
*/
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-// If the request had an invalid topic ID, then we assume that topic IDs 
are not supported.
-// The topic ID was not inconsistent, so return true.
-// If the log is empty, then we can not say that topic ID is inconsistent, 
so return true.
-if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-  true
-else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
   Hmm. Looking into this further, it seems that the log is created in 
RaftReplicaManager when making a leader or a follower. This occurs in the 
`handleMetadataRecords` method (non-deferring case) and the 
`endMetadataChangeDeferral` method.  Each requires a few methods to pass 
through before we get to `LogManager.getOrCreateLog`, which is not ideal to 
pass a topic ID through (especially when some of this code is shared with the 
ZK path, but not impossible)
   
   One thing I am wondering though is how I tested the changes. I think I 
assume the log is created already, but this code suggests that the log will not 
be created until `makeLeaders/Followers` in `handleMetadataRecords` in the non 
deferring case, and not created until `endMetadataChangeDeferral` in the 
deferring case. This means that where I set topic IDs now, we won't have a log 
on the first pass! So I think maybe it is worth it to pass the ID through these 
methods. I think I can keep the check of the topic ID where it is though. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

2021-03-12 Thread GitBox


jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593330038



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the 
topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the 
log.
* If a valid topic ID is provided, and the log exists but has no ID set, 
set the log ID to be the request ID.
*
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from 
the metadata records
+   * @return true if the received topic id is consistent, false otherwise
*/
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-// If the request had an invalid topic ID, then we assume that topic IDs 
are not supported.
-// The topic ID was not inconsistent, so return true.
-// If the log is empty, then we can not say that topic ID is inconsistent, 
so return true.
-if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-  true
-else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
   Hmm. Looking into this further, it seems that the log is created in 
RaftReplicaManager when making a leader or a follower. This occurs in the 
`handleMetadataRecords` method (non-deferring case) and the 
`endMetadataChangeDeferral` method.  Each requires a few methods to pass 
through before we get to `LogManager.getOrCreateLog`, which is not ideal to 
pass a topic ID through (especially when some of this code is shared with the 
ZK path, but not impossible)
   
   One thing I am wondering though is how I tested the changes. I think I 
assume the log is created already, but this code suggests that the log will not 
be created until `makeLeaders/Followers` in the non deferring case, and not 
created until `endMetadataChangeDeferral` in the deferring case. This means 
that where I set topic IDs now, we won't have a log on the first pass! So I 
think maybe it is worth it to pass the ID through these methods. I think I can 
keep the check of the topic ID where it is though. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

2021-03-12 Thread GitBox


jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593330038



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the 
topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the 
log.
* If a valid topic ID is provided, and the log exists but has no ID set, 
set the log ID to be the request ID.
*
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from 
the metadata records
+   * @return true if the received topic id is consistent, false otherwise
*/
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-// If the request had an invalid topic ID, then we assume that topic IDs 
are not supported.
-// The topic ID was not inconsistent, so return true.
-// If the log is empty, then we can not say that topic ID is inconsistent, 
so return true.
-if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-  true
-else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
   Hmm. Looking into this further, it seems that the log is created in 
RaftReplicaManager when making a leader or a follower. This occurs in the 
`handleMetadataRecords` method and the `endMetadataChangeDeferral` method.  
Each requires a few methods to pass through before we get to 
`LogManager.getOrCreateLog`, which is not ideal to pass a topic ID through 
(especially when some of this code is shared with the ZK path, but not 
impossible)
   
   One thing I am wondering though is how I tested the changes. I think I 
assume the log is created already, but this code suggests that the log will not 
be created until `makeLeaders/Followers` in the non deferring case, and not 
created until `endMetadataChangeDeferral` in the deferring case. This means 
that where I set topic IDs now, we won't have a log on the first pass! So I 
think maybe it is worth it to pass the ID through these methods. I think I can 
keep the check of the topic ID where it is though. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #10305: KAFKA-10357: Add missing repartition topic validation

2021-03-12 Thread GitBox


guozhangwang merged pull request #10305:
URL: https://github.com/apache/kafka/pull/10305


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #10308: MINOR: Update year in NOTICE

2021-03-12 Thread GitBox


mimaison commented on pull request #10308:
URL: https://github.com/apache/kafka/pull/10308#issuecomment-797619654


   cc @ijuma @mjsax 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison opened a new pull request #10308: MINOR: Update year in NOTICE

2021-03-12 Thread GitBox


mimaison opened a new pull request #10308:
URL: https://github.com/apache/kafka/pull/10308


   This is required to run a release as it is checked in release.py
   
   This needs to be applied to 2.8, 2.7 and 2.6 too
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] askldjd commented on pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration

2021-03-12 Thread GitBox


askldjd commented on pull request #8921:
URL: https://github.com/apache/kafka/pull/8921#issuecomment-797598156


   > @askldjd Good catch, you're right it looks like this was only merged into 
2.4. I've reopened 
[KAFKA-10160](https://issues.apache.org/jira/browse/KAFKA-10160).
   > Let's see if @satishbellapu has time to open a PR in the next few days. 
Otherwise I'll port this change next week
   
   Thanks @mimaison. Really appreciate it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-03-12 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r593293236



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -80,15 +156,34 @@ public Errors error() {
 return Errors.forCode(data.errorCode());
 }
 
-public LinkedHashMap 
responseData() {
+public LinkedHashMap 
responseData(Map topicNames, short version) {
+if (version < 13)
+return toResponseDataMap();
+return toResponseDataMap(topicNames);
+
+}
+
+// TODO: Should be replaced or cleaned up. The idea is that in KafkaApis 
we need to reconstruct responseData even though we could have just passed in 
and out a map.
+//  With topic IDs, recreating the map takes a little more time since we 
have to get the topic name from the topic ID to name map.
+//  The refactor somewhat helps in KafkaApis, but we have to recompute the 
map instead of just returning it.
+//  This is unsafe in test cases now (FetchSessionTest) where it used to 
be safe. Before it would just pull the responseData map.
+//  If we wanted to recompute with topicNames we could call 
responseData(topicNames) however,
+//  now it will just return the version computed here.
+
+// Used when we can guarantee responseData is populated with all possible 
partitions
+// This occurs when we have a response version < 13 or we built the 
FetchResponse with
+// responseDataMap as a parameter and we have the same topic IDs available.
+public LinkedHashMap 
resolvedResponseData() {

Review comment:
   I suppose we could set them server side, but when we iterate though the 
topics, we would need to not include those whose topic IDs could not be 
resolved.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-03-12 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r593283525



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -80,15 +156,34 @@ public Errors error() {
 return Errors.forCode(data.errorCode());
 }
 
-public LinkedHashMap 
responseData() {
+public LinkedHashMap 
responseData(Map topicNames, short version) {
+if (version < 13)
+return toResponseDataMap();
+return toResponseDataMap(topicNames);
+
+}
+
+// TODO: Should be replaced or cleaned up. The idea is that in KafkaApis 
we need to reconstruct responseData even though we could have just passed in 
and out a map.
+//  With topic IDs, recreating the map takes a little more time since we 
have to get the topic name from the topic ID to name map.
+//  The refactor somewhat helps in KafkaApis, but we have to recompute the 
map instead of just returning it.
+//  This is unsafe in test cases now (FetchSessionTest) where it used to 
be safe. Before it would just pull the responseData map.
+//  If we wanted to recompute with topicNames we could call 
responseData(topicNames) however,
+//  now it will just return the version computed here.
+
+// Used when we can guarantee responseData is populated with all possible 
partitions
+// This occurs when we have a response version < 13 or we built the 
FetchResponse with
+// responseDataMap as a parameter and we have the same topic IDs available.
+public LinkedHashMap 
resolvedResponseData() {

Review comment:
   Got it. I think the issue here then is that some of the information can 
not be auto-generated data. We need topic names for certain methods but names 
will not be in the auto-generated data. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10285: KAFKA-12442: Upgrade ZSTD JNI from 1.4.8-4 to 1.4.9-1

2021-03-12 Thread GitBox


ijuma commented on pull request #10285:
URL: https://github.com/apache/kafka/pull/10285#issuecomment-797576186


   @chia7712 worth cherry-picking to `2.8` if the aim is to align the versions 
with other projects that are releasing soon.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10203: KAFKA-12415: Prepare for Gradle 7.0 and restrict transitive scope for non api dependencies

2021-03-12 Thread GitBox


ijuma commented on a change in pull request #10203:
URL: https://github.com/apache/kafka/pull/10203#discussion_r593271376



##
File path: README.md
##
@@ -69,10 +69,6 @@ Generate coverage for a single module, i.e.:
 ### Building a binary release gzipped tar ball ###
 ./gradlew clean releaseTarGz
 
-The above command will fail if you haven't set up the signing key. To bypass 
signing the artifact, you can run:
-
-./gradlew clean releaseTarGz -x signArchives

Review comment:
   We figured it out, the build was using a non snapshot version. Passing 
`-PskipSigning=true` fixed it. Submitted 
https://github.com/apache/kafka/pull/10307 to make it easier to debug these 
issues in the future.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #10307: MINOR: Log project, gradle, java and scala versions at the start of the build

2021-03-12 Thread GitBox


ijuma opened a new pull request #10307:
URL: https://github.com/apache/kafka/pull/10307


   This is useful when debugging build issues. I also removed two printlns that 
are now redundant, so
   this makes the build more informative and less noisy at the same time.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7525) Handling corrupt records

2021-03-12 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300379#comment-17300379
 ] 

Alexandre Dupriez commented on KAFKA-7525:
--

It was fixed in 2.6.0: KAFKA-9206

> Handling corrupt records
> 
>
> Key: KAFKA-7525
> URL: https://issues.apache.org/jira/browse/KAFKA-7525
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Affects Versions: 1.1.0
>Reporter: Katarzyna Solnica
>Priority: Major
>
> When Java consumer encounters a corrupt record on a partition it reads from, 
> it throws:
> {code:java}
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from XYZ. If needed, please seek past the record to continue 
> consumption.
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1125)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>     (...)
> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 
> is less than the minimum record overhead (14){code}
> or:
> {code:java}
> java.lang.IllegalStateException: Unexpected error code 2 while fetching data
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:936)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:485)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>     (...){code}
> 1. Could you consider throwing CorruptRecordException from 
> parseCompletedFetch() when error == Errors.CORRUPT_MESSAGE?
> 2. Seeking past the corrupt record means losing data. I've noticed that the 
> record is often correct on a follower ISR, and manual change of the partition 
> leader to the follower node solves the issue in case partition is used by a 
> single consumer group. Couldn't Kafka server discover such situations and 
> recover corrupt records from logs available on other ISRs somehow?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on a change in pull request #10203: KAFKA-12415: Prepare for Gradle 7.0 and restrict transitive scope for non api dependencies

2021-03-12 Thread GitBox


ijuma commented on a change in pull request #10203:
URL: https://github.com/apache/kafka/pull/10203#discussion_r593221711



##
File path: README.md
##
@@ -69,10 +69,6 @@ Generate coverage for a single module, i.e.:
 ### Building a binary release gzipped tar ball ###
 ./gradlew clean releaseTarGz
 
-The above command will fail if you haven't set up the signing key. To bypass 
signing the artifact, you can run:
-
-./gradlew clean releaseTarGz -x signArchives

Review comment:
   Note that your command has `install` while the README line I deleted did 
not (and was wrong). It is still weird that you're seeing this for snapshot 
builds. I'll work with you all offline to try and figure it out.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #10299: KAFKA-10070: parameterize Connect unit tests to remove code duplication

2021-03-12 Thread GitBox


mimaison commented on a change in pull request #10299:
URL: https://github.com/apache/kafka/pull/10299#discussion_r593134903



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/ParameterizedTest.java
##
@@ -0,0 +1,67 @@
+package org.apache.kafka.connect.util;

Review comment:
   We're missing the license header here

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
##
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import java.util.Collection;

Review comment:
   Should we move this with the other `java.util` imports below?

##
File path: checkstyle/suppressions.xml
##
@@ -158,7 +158,7 @@
   files="StreamThread.java"/>
 
 
+  files="(KStreamImpl|KTableImpl|WorkerSourceTaskTest).java"/>

Review comment:
   Instead of adding this to Kafka Streams rules, can we move it to the 
Connect section just above?

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/ParameterizedTest.java
##
@@ -0,0 +1,67 @@
+package org.apache.kafka.connect.util;
+
+import java.lang.annotation.Annotation;
+import org.junit.runner.Description;
+import org.junit.runner.manipulation.Filter;
+import org.junit.runner.manipulation.NoTestsRemainException;
+import org.junit.runners.Parameterized;
+
+/**
+ * Running a single parameterized test causes issue as explained in
+ * http://youtrack.jetbrains.com/issue/IDEA-65966 and
+ * 
https://stackoverflow.com/questions/12798079/initializationerror-with-eclipse-and-junit4-when-executing-a-single-test/18438718#18438718
+ *
+ * As a workaround, the original filter needs to be wrapped and then pass it a 
deparameterized
+ * description which removes the parameter part (See deparametrizeName)
+ */
+public class ParameterizedTest extends Parameterized {
+
+  public ParameterizedTest (Class klass) throws Throwable {

Review comment:
   Checkstyle fails because it expects indentations to be 4 spaces





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #10268: MINOR: output meaningful error message

2021-03-12 Thread GitBox


mimaison merged pull request #10268:
URL: https://github.com/apache/kafka/pull/10268


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #10268: MINOR: output meaningful error message

2021-03-12 Thread GitBox


mimaison commented on pull request #10268:
URL: https://github.com/apache/kafka/pull/10268#issuecomment-797445589


   We got to see it in action immediately, as the tests failed in the [JDK8 
build](https://github.com/apache/kafka/pull/10268/checks?check_run_id=2037055584):
   ```
   java.lang.AssertionError: Connector MirrorCheckpointConnector tasks did not 
start in time on cluster: primary-connect-cluster
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #10192: MINOR: Add missing unit tests for Mirror Connect

2021-03-12 Thread GitBox


mimaison merged pull request #10192:
URL: https://github.com/apache/kafka/pull/10192


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10160) Kafka MM2 consumer configuration

2021-03-12 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-10160:
---
Fix Version/s: (was: 2.7.0)
   2.4.2

> Kafka MM2 consumer configuration
> 
>
> Key: KAFKA-10160
> URL: https://issues.apache.org/jira/browse/KAFKA-10160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Pavol Ipoth
>Assignee: sats
>Priority: Major
>  Labels: configuration, kafka, mirror-maker
> Fix For: 2.4.2
>
>
> [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,]
>  according this producer/consumer level properties should be configured as 
> e.g. somesource->sometarget.consumer.client.id, i try to set 
> somesource->sometarget.consumer.auto.offset.reset=latest, but without 
> success, consumer always tries to fetch earliest, not sure if bug or my 
> misconfiguration, but then at least some update to docu would be useful



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration

2021-03-12 Thread GitBox


mimaison commented on pull request #8921:
URL: https://github.com/apache/kafka/pull/8921#issuecomment-797420022


   @askldjd Good catch, you're right it looks like this was only merged into 
2.4. I've reopened 
[KAFKA-10160](https://issues.apache.org/jira/browse/KAFKA-10160).
   Let's see if @satishbellapu has time to open a PR in the next few days. 
Otherwise I'll port this change next week



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10160) Kafka MM2 consumer configuration

2021-03-12 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300227#comment-17300227
 ] 

Mickael Maison commented on KAFKA-10160:


Reopening this issue as it seems it was only fixed in the 2.4 branch. 
[~sbellapu] can you open a PR against trunk? 

> Kafka MM2 consumer configuration
> 
>
> Key: KAFKA-10160
> URL: https://issues.apache.org/jira/browse/KAFKA-10160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Pavol Ipoth
>Assignee: sats
>Priority: Major
>  Labels: configuration, kafka, mirror-maker
> Fix For: 2.4.2
>
>
> [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,]
>  according this producer/consumer level properties should be configured as 
> e.g. somesource->sometarget.consumer.client.id, i try to set 
> somesource->sometarget.consumer.auto.offset.reset=latest, but without 
> success, consumer always tries to fetch earliest, not sure if bug or my 
> misconfiguration, but then at least some update to docu would be useful



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-10160) Kafka MM2 consumer configuration

2021-03-12 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reopened KAFKA-10160:


> Kafka MM2 consumer configuration
> 
>
> Key: KAFKA-10160
> URL: https://issues.apache.org/jira/browse/KAFKA-10160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Pavol Ipoth
>Assignee: sats
>Priority: Major
>  Labels: configuration, kafka, mirror-maker
> Fix For: 2.7.0
>
>
> [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,]
>  according this producer/consumer level properties should be configured as 
> e.g. somesource->sometarget.consumer.client.id, i try to set 
> somesource->sometarget.consumer.auto.offset.reset=latest, but without 
> success, consumer always tries to fetch earliest, not sure if bug or my 
> misconfiguration, but then at least some update to docu would be useful



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12449) Remove deprecated WindowStore#put

2021-03-12 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya updated KAFKA-12449:
-
Affects Version/s: 3.0.0

> Remove deprecated WindowStore#put
> -
>
> Key: KAFKA-12449
> URL: https://issues.apache.org/jira/browse/KAFKA-12449
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> Related to KIP-474: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeqo edited a comment on pull request #10293: KAFKA-12449: Remove deprecated WindowStore#put

2021-03-12 Thread GitBox


jeqo edited a comment on pull request #10293:
URL: https://github.com/apache/kafka/pull/10293#issuecomment-796958801


   ~~Totally missed that this was proposed and accepted in KIP-474. Hope is not 
stepping into an existing implementation.~~
   
   Just realized the KIP is just about deprecating this method, and this is 
just completing it with the removal . Turned the sub-task into a task in JIRA 
and linked it to the right KIP.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12449) Remove deprecated WindowStore#put

2021-03-12 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya updated KAFKA-12449:
-
Parent: (was: KAFKA-10434)
Issue Type: Task  (was: Sub-task)

> Remove deprecated WindowStore#put
> -
>
> Key: KAFKA-12449
> URL: https://issues.apache.org/jira/browse/KAFKA-12449
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> Related to KIP-474: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-03-12 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593033011



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -76,6 +76,11 @@
 "wait for writes to accumulate before flushing them to disk.";

Review comment:
   I updated the doc for the maxUnflushedBytes config.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-03-12 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593032627



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -76,6 +76,11 @@
 "wait for writes to accumulate before flushing them to disk.";
 public static final int DEFAULT_QUORUM_LINGER_MS = 25;
 
+public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = 
QUORUM_PREFIX + "append.max.unflushed.bytes";

Review comment:
   I see what you are trying to say. Well, the premise of this ticket 
originally was to trigger fsyncs happen the moment a configured amount of bytes 
have been accumulated. Here is the original description for Jason in the ticket:
   
   > In KAFKA-10601, we implemented linger semantics similar to the producer to 
let the leader accumulate a batch of writes before fsyncing them to disk. 
Currently the fsync is only based on the linger time, but it would be helpful 
to make it size-based as well. In other words, if we accumulate a configurable 
N bytes, then we should not wait for linger expiration and should just fsync 
immediately.
   
   But as you pointed out, it is also due to the fact that in the current 
implementation batch append and fsync goes hand in hand. 
   
   With the future implementation on deferring fsync, this might just affect 
the batch appends and considering that in mind, imo it makes sense to rename it 
to `append.linger.bytes` . It also matches with `append.linger.ms` .
   
   BTW, on the fsync deferral track. i had created a draft PR where i have 
outlined my approach: https://github.com/apache/kafka/pull/10278
   
   Request you or Jason to review this..





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-03-12 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593024705



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -474,6 +474,7 @@ public void testAccumulatorClearedAfterBecomingFollower() 
throws Exception {
 .thenReturn(buffer);
 
 RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.withMaxUnflushedBytes(KafkaRaftClient.MAX_BATCH_SIZE)

Review comment:
   Yeah. The reason for that is that in this PR @hachikuji had suggested to 
change the logic for setting maxBatchSize in BatchAccumulator to the following 
way: 
   
   `this.maxBatchSize = Math.min(maxBatchSize, maxUnflushedBytes);`
   
   That is the reason I am setting some combinations of value to check if it 
behaves correctly. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-03-12 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593019808



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -617,6 +620,38 @@ public void 
testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except
 assertEquals(3L, context.log.endOffset().offset);
 }
 
+@Test
+public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws 
Exception {
+// This test verifies that the client will get woken up immediately
+// if the linger timeout has expired during an append
+
+int localId = 0;
+int otherNodeId = 1;
+int minFlushSizeInBytes = 120;
+Set voters = Utils.mkSet(localId, otherNodeId);
+
+RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.withMaxUnflushedBytes(minFlushSizeInBytes)

Review comment:
   yeah that was an oversight as well. Changed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-03-12 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593019665



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -617,6 +620,38 @@ public void 
testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except
 assertEquals(3L, context.log.endOffset().offset);
 }
 
+@Test
+public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws 
Exception {
+// This test verifies that the client will get woken up immediately
+// if the linger timeout has expired during an append

Review comment:
   yes.. changed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2021-03-12 Thread GitBox


vamossagar12 commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r593019531



##
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##
@@ -1078,6 +1078,7 @@ private static FetchResponseData snapshotFetchResponse(
 private static SnapshotWriter snapshotWriter(RaftClientTestContext 
context, RawSnapshotWriter snapshot) {
 return new SnapshotWriter<>(
 snapshot,
+1024,

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12431) Fetch Request/Response without Topic information

2021-03-12 Thread Peter Sinoros-Szabo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Sinoros-Szabo resolved KAFKA-12431.
-
Resolution: Workaround

Closing this ticket as it seem with proper settings it works fine.

> Fetch Request/Response without Topic information
> 
>
> Key: KAFKA-12431
> URL: https://issues.apache.org/jira/browse/KAFKA-12431
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.1
>Reporter: Peter Sinoros-Szabo
>Priority: Major
> Attachments: fetch-on-2.4.1.png, fetch-on-2.6.1.png, 
> kafka-highcpu-24.svg.zip, kafka-highcpu-26.svg.zip
>
>
> I was running a 6 node Kafka 2.4.1 cluster with protocol and message format 
> version set to 2.4. I wanted to upgrade the cluster to 2.6.1 and after I 
> upgraded the 1st broker to 2.6.1 without any configuration change, I noticed 
> much higher CPU usage on that broker (instead of 25% CPU usage it was  ~350%) 
> and about 3-4x higher network traffic. So I dumped the traffic between the 
> Kafka client and broker and compared it with the traffic of the same broker 
> downgraded to 2.4.1.
> It seems to me that after I upgraded to 2.6.1, the Fetch requests and 
> responses are not complete, it is missing the topics part of the Fetch 
> Request, I don't know for what reason. I guess there should be always a 
> topics part.
> I'll attache a screenshot from these messages.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12373) Improve KafkaRaftClient handling of graceful shutdown

2021-03-12 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300129#comment-17300129
 ] 

Sagar Rao commented on KAFKA-12373:
---

Thanks [~hachikuji], assigned it to myself

> Improve KafkaRaftClient handling of graceful shutdown
> -
>
> Key: KAFKA-12373
> URL: https://issues.apache.org/jira/browse/KAFKA-12373
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Sagar Rao
>Priority: Major
>
> The current implementation simply closes the metrics group when it is closed.
> When closing the KafkaRaftClient that is the leader it should perform at 
> least the following steps:
>  # Stop accepting new schedule append operations
>  # Append to the log the batches currently in the BatchAccumulator
>  # Wait with a timeout for the high-watermark to reach the LEO
>  # Cooperatively resign as leader from the quorum



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12373) Improve KafkaRaftClient handling of graceful shutdown

2021-03-12 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-12373:
-

Assignee: Sagar Rao

> Improve KafkaRaftClient handling of graceful shutdown
> -
>
> Key: KAFKA-12373
> URL: https://issues.apache.org/jira/browse/KAFKA-12373
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Sagar Rao
>Priority: Major
>
> The current implementation simply closes the metrics group when it is closed.
> When closing the KafkaRaftClient that is the leader it should perform at 
> least the following steps:
>  # Stop accepting new schedule append operations
>  # Append to the log the batches currently in the BatchAccumulator
>  # Wait with a timeout for the high-watermark to reach the LEO
>  # Cooperatively resign as leader from the quorum



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >