[GitHub] [kafka] ijuma commented on a diff in pull request #12498: KAFKA-13986; Brokers should include node.id in fetches to metadata quorum

2022-08-09 Thread GitBox


ijuma commented on code in PR #12498:
URL: https://github.com/apache/kafka/pull/12498#discussion_r942043740


##
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##
@@ -84,21 +84,21 @@ class RaftManagerTest {
   @Test
   def testSentinelNodeIdIfBrokerRoleOnly(): Unit = {

Review Comment:
   Does the test name need to change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14155) Kafka Stream - State transition from RUNNING to ERROR

2022-08-09 Thread Harsha Nadig (Jira)
Harsha Nadig created KAFKA-14155:


 Summary: Kafka Stream - State transition from RUNNING to ERROR
 Key: KAFKA-14155
 URL: https://issues.apache.org/jira/browse/KAFKA-14155
 Project: Kafka
  Issue Type: Bug
Reporter: Harsha Nadig


2022-08-09 06:05:17 [-StreamThread-1] INFO 
c.g.o.d.k.c.KafkaStreamsConfig.lambda$null$0[82] State transition from RUNNING 
to ERROR

2022-08-09 06:05:17 [-StreamThread-1] ERROR 
o.a.k.streams.KafkaStreams.maybeSetError[443] stream-client [] All 
stream threads have died. The instance will be in error state and should be 
closed.

2022-08-09 06:05:17 [] INFO 
o.a.k.s.p.i.StreamThread.completeShutdown[935] stream-thread 
[-StreamThread-1] Shutdown complete

2022-08-09 06:05:17 [-StreamThread-1] ERROR 
c.g.o.d.k.c.KafkaStreamsConfig.uncaughtException[88] StreamsThread threadId: 
-StreamThread-1

TaskManager

MetadataState:

Tasks:

throws exception:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=12_1, processor=KSTREAM-SOURCE-, topic=, partition=1, 
offset=, stacktrace=java.lang.NullPointerException

at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696)

at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)

at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)

at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)

at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)

Caused by: java.lang.NullPointerException: null



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] andymg3 commented on a diff in pull request #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller

2022-08-09 Thread GitBox


andymg3 commented on code in PR #12499:
URL: https://github.com/apache/kafka/pull/12499#discussion_r941967874


##
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##
@@ -364,15 +423,15 @@ class BrokerToControllerRequestThread(
   }
 
   override def doWork(): Unit = {
-if (activeControllerAddress().isDefined) {
+if (activeControllerOpt().isDefined) {
   super.pollOnce(Long.MaxValue)
 } else {
   debug("Controller isn't cached, looking for local metadata changes")
   controllerNodeProvider.get() match {
-case Some(controllerNode) =>
-  info(s"Recorded new controller, from now on will use broker 
$controllerNode")
-  updateControllerAddress(controllerNode)
-  metadataUpdater.setNodes(Seq(controllerNode).asJava)
+case Some(controllerNodeAndEpoch) =>

Review Comment:
   Is this where/how eventually the `LeaderAndIsr` from the new controller 
change gets applied? 



##
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##
@@ -364,15 +423,15 @@ class BrokerToControllerRequestThread(
   }
 
   override def doWork(): Unit = {
-if (activeControllerAddress().isDefined) {
+if (activeControllerOpt().isDefined) {
   super.pollOnce(Long.MaxValue)
 } else {
   debug("Controller isn't cached, looking for local metadata changes")
   controllerNodeProvider.get() match {
-case Some(controllerNode) =>
-  info(s"Recorded new controller, from now on will use broker 
$controllerNode")
-  updateControllerAddress(controllerNode)
-  metadataUpdater.setNodes(Seq(controllerNode).asJava)
+case Some(controllerNodeAndEpoch) =>

Review Comment:
   Is this where/how eventually the `LeaderAndIsr` from the new controllee gets 
applied? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] andymg3 commented on a diff in pull request #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller

2022-08-09 Thread GitBox


andymg3 commented on code in PR #12499:
URL: https://github.com/apache/kafka/pull/12499#discussion_r941967046


##
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##
@@ -317,21 +373,24 @@ class BrokerToControllerRequestThread(
   override def generateRequests(): Iterable[RequestAndCompletionHandler] = {
 val currentTimeMs = time.milliseconds()
 val requestIter = requestQueue.iterator()
+val controllerOpt = activeControllerOpt()
+
 while (requestIter.hasNext) {
   val request = requestIter.next
   if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) {
 requestIter.remove()
 request.callback.onTimeout()
   } else {
-val controllerAddress = activeControllerAddress()
-if (controllerAddress.isDefined) {
-  requestIter.remove()
-  return Some(RequestAndCompletionHandler(
-time.milliseconds(),
-controllerAddress.get,
-request.request,
-handleResponse(request)
-  ))
+controllerOpt.foreach { activeController =>
+  if (activeController.epoch >= request.minControllerEpoch) {

Review Comment:
   To confirm, this check is done on the broker side right? I guess you sort of 
allude to this in the PR description that potentially a more ideal solution 
would be for the controller to do the check server side, but that would require 
a version bump. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji opened a new pull request, #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller

2022-08-09 Thread GitBox


hachikuji opened a new pull request, #12499:
URL: https://github.com/apache/kafka/pull/12499

   It is possible currently for a leader to send an `AlterPartition` request to 
a stale controller which does not have the latest leader epoch discovered 
through a `LeaderAndIsr` request. In this case, the stale controller returns 
`FENCED_LEADER_EPOCH`, which causes the partition leader to get stuck. This is 
a change in behavior following https://github.com/apache/kafka/pull/12032. 
Prior to that patch, the request would either be accepted (potentially 
incorrectly) if the `LeaderAndIsr` state matched that on the controller, or it 
would have returned `NOT_CONTROLLER`. 
   
   This patch fixes the problem by ensuring that `AlterPartition` is sent to a 
controller with an epoch which is at least as large as that of the controller 
which sent the `LeaderAndIsr` request. This ensures that the 
`FENCED_LEADER_EPOCH` error from the controller can be trusted.
   
   A more elegant solution to this problem would probably be to include the 
controller epoch in the `AlterPartition` request, but this would require a 
version bump. Alternatively, we considered letting the controller return 
`UNKNOWN_LEADER_EPOCH` instead of `FENCED_LEADER_EPOCH` when the epoch is 
larger than what it has in its context. This too likely would require a version 
bump. Finally, we considered reverting 
https://github.com/apache/kafka/pull/12032, which would restore the looser 
validation logic which allows the controller to accept `AlterPartition` 
requests with larger leader epochs. We rejected this option because we feel it 
can lead to correctness violations.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14081) Cannot get my MetricsReporter implementation to receive meaningful metrics

2022-08-09 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17577677#comment-17577677
 ] 

Rens Groothuijsen commented on KAFKA-14081:
---

I had a look at the source code, and it does appear that {{metricChange()}} is 
only being called when adding a metric, not when a metric changes. Given that 
the documentation of {{metricChange()}} says otherwise, I assume this was 
accidentally removed at some point.

> Cannot get my MetricsReporter implementation to receive meaningful metrics
> --
>
> Key: KAFKA-14081
> URL: https://issues.apache.org/jira/browse/KAFKA-14081
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Gian Luca
>Priority: Minor
>
> I want to extract metrics from KafkaProducer to export them to our company 
> monitoring solution. At first I went for implementing {{MetricsReporter}} and 
> registering my implementation through the "metric.reporters" config property. 
> The class is correctly registered as it receives metric updates through 
> {{metricChange()}} while KafkaProducer is being used. The problem is that all 
> the metric values are stuck at zero (NaN in older versions of Kafka), even 
> the most trivial (e.g. 'record-send-total').
> If instead of using a report I simply poll the {{metrics()}} method of the 
> KafkaProducer, then I see meaningful values: counters increasing over time, 
> etc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14154) Persistent URP after controller soft failure

2022-08-09 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14154:

Description: 
We ran into a scenario where a partition leader was unable to expand the ISR 
after a soft controller failover. Here is what happened:

Initial state: leader=1, isr=[1,2], leader epoch=10. Broker 1 is acting as the 
current controller.

1. Broker 1 loses its session in Zookeeper.  

2. Broker 2 becomes the new controller.

3. During initialization, controller 2 removes 1 from the ISR. So state is 
updated: leader=2, isr=[2], leader epoch=11.

4. Broker 2 receives `LeaderAndIsr` from the new controller with leader 
epoch=11.

5. Broker 2 immediately tries to add replica 1 back to the ISR since it is 
still fetching and is caught up. However, the 
`BrokerToControllerChannelManager` is still pointed at controller 1, so that is 
where the `AlterPartition` is sent.

6. Controller 1 does not yet realize that it is not the controller, so it 
processes the `AlterPartition` request. It sees the leader epoch of 11, which 
is higher than what it has in its own context. Following changes to the 
`AlterPartition` validation in 
[https://github.com/apache/kafka/pull/12032/files,] the controller returns 
FENCED_LEADER_EPOCH.

7. After receiving the FENCED_LEADER_EPOCH from the old controller, the leader 
is stuck because it assumes that the error implies that another LeaderAndIsr 
request should be sent.

Prior to 
[https://github.com/apache/kafka/pull/12032/files|https://github.com/apache/kafka/pull/12032/files,],
 the way we handled this case was a little different. We only verified that the 
leader epoch in the request was at least as large as the current epoch in the 
controller context. Anything higher was accepted. The controller would have 
attempted to write the updated state to Zookeeper. This update would have 
failed because of the controller epoch check, however, we would have returned 
NOT_CONTROLLER in this case, which is handled in `AlterPartitionManager`.

It is tempting to revert the logic, but the risk is in the idempotency check: 
[https://github.com/apache/kafka/pull/12032/files#diff-3e042c962e80577a4cc9bbcccf0950651c6b312097a86164af50003c00c50d37L2369.]
 If the AlterPartition request happened to match the state inside the old 
controller, the controller would consider the update successful and return no 
error. But if its state was already stale at that point, then that might cause 
the leader to incorrectly assume that the state had been updated.

One way to fix this problem without weakening the validation is to rely on the 
controller epoch in `AlterPartitionManager`. When we discover a new controller, 
we also discover its epoch, so we can pass that through. The `LeaderAndIsr` 
request already includes the controller epoch of the controller that sent it 
and we already propagate this through to `AlterPartition.submit`. Hence all we 
need to do is verify that the epoch of the current controller target is at 
least as large as the one discovered through the `LeaderAndIsr`.

  was:
We ran into a scenario where a partition leader was unable to expand the ISR 
after a soft controller failover. Here is what happened:

Initial state: leader=1, isr=[1,2], leader epoch=10. Broker 1 is acting as the 
current controller.

1. Broker 1 loses its session in Zookeeper.  

2. Broker 2 becomes the new controller.

3. During initialization, controller 2 removes 1 from the ISR. So state is 
updated: leader=2, isr=[1, 2], leader epoch=11.

4. Broker 2 receives `LeaderAndIsr` from the new controller with leader 
epoch=11.

5. Broker 2 immediately tries to add replica 1 back to the ISR since it is 
still fetching and is caught up. However, the 
`BrokerToControllerChannelManager` is still pointed at controller 1, so that is 
where the `AlterPartition` is sent.

6. Controller 1 does not yet realize that it is not the controller, so it 
processes the `AlterPartition` request. It sees the leader epoch of 11, which 
is higher than what it has in its own context. Following changes to the 
`AlterPartition` validation in 
[https://github.com/apache/kafka/pull/12032/files,] the controller returns 
FENCED_LEADER_EPOCH.

7. After receiving the FENCED_LEADER_EPOCH from the old controller, the leader 
is stuck because it assumes that the error implies that another LeaderAndIsr 
request should be sent.

Prior to 
[https://github.com/apache/kafka/pull/12032/files|https://github.com/apache/kafka/pull/12032/files,],
 the way we handled this case was a little different. We only verified that the 
leader epoch in the request was at least as large as the current epoch in the 
controller context. Anything higher was accepted. The controller would have 
attempted to write the updated state to Zookeeper. This update would have 
failed because of the controller epoch check, however, we would have returned 
NOT_CONTROLLER 

[GitHub] [kafka] guozhangwang commented on pull request #12497: KAFKA-10199: Expose read only task from state updater

2022-08-09 Thread GitBox


guozhangwang commented on PR #12497:
URL: https://github.com/apache/kafka/pull/12497#issuecomment-1210002259

   The failures in `DefaultStateUpdaterTest` seems relevant?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14154) Persistent URP after controller soft failure

2022-08-09 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14154:

Fix Version/s: 3.3.0

> Persistent URP after controller soft failure
> 
>
> Key: KAFKA-14154
> URL: https://issues.apache.org/jira/browse/KAFKA-14154
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.3.0
>
>
> We ran into a scenario where a partition leader was unable to expand the ISR 
> after a soft controller failover. Here is what happened:
> Initial state: leader=1, isr=[1,2], leader epoch=10. Broker 1 is acting as 
> the current controller.
> 1. Broker 1 loses its session in Zookeeper.  
> 2. Broker 2 becomes the new controller.
> 3. During initialization, controller 2 removes 1 from the ISR. So state is 
> updated: leader=2, isr=[1, 2], leader epoch=11.
> 4. Broker 2 receives `LeaderAndIsr` from the new controller with leader 
> epoch=11.
> 5. Broker 2 immediately tries to add replica 1 back to the ISR since it is 
> still fetching and is caught up. However, the 
> `BrokerToControllerChannelManager` is still pointed at controller 1, so that 
> is where the `AlterPartition` is sent.
> 6. Controller 1 does not yet realize that it is not the controller, so it 
> processes the `AlterPartition` request. It sees the leader epoch of 11, which 
> is higher than what it has in its own context. Following changes to the 
> `AlterPartition` validation in 
> [https://github.com/apache/kafka/pull/12032/files,] the controller returns 
> FENCED_LEADER_EPOCH.
> 7. After receiving the FENCED_LEADER_EPOCH from the old controller, the 
> leader is stuck because it assumes that the error implies that another 
> LeaderAndIsr request should be sent.
> Prior to 
> [https://github.com/apache/kafka/pull/12032/files|https://github.com/apache/kafka/pull/12032/files,],
>  the way we handled this case was a little different. We only verified that 
> the leader epoch in the request was at least as large as the current epoch in 
> the controller context. Anything higher was accepted. The controller would 
> have attempted to write the updated state to Zookeeper. This update would 
> have failed because of the controller epoch check, however, we would have 
> returned NOT_CONTROLLER in this case, which is handled in 
> `AlterPartitionManager`.
> It is tempting to revert the logic, but the risk is in the idempotency check: 
> [https://github.com/apache/kafka/pull/12032/files#diff-3e042c962e80577a4cc9bbcccf0950651c6b312097a86164af50003c00c50d37L2369.]
>  If the AlterPartition request happened to match the state inside the old 
> controller, the controller would consider the update successful and return no 
> error. But if its state was already stale at that point, then that might 
> cause the leader to incorrectly assume that the state had been updated.
> One way to fix this problem without weakening the validation is to rely on 
> the controller epoch in `AlterPartitionManager`. When we discover a new 
> controller, we also discover its epoch, so we can pass that through. The 
> `LeaderAndIsr` request already includes the controller epoch of the 
> controller that sent it and we already propagate this through to 
> `AlterPartition.submit`. Hence all we need to do is verify that the epoch of 
> the current controller target is at least as large as the one discovered 
> through the `LeaderAndIsr`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14154) Persistent URP after controller soft failure

2022-08-09 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14154:
---

 Summary: Persistent URP after controller soft failure
 Key: KAFKA-14154
 URL: https://issues.apache.org/jira/browse/KAFKA-14154
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


We ran into a scenario where a partition leader was unable to expand the ISR 
after a soft controller failover. Here is what happened:

Initial state: leader=1, isr=[1,2], leader epoch=10. Broker 1 is acting as the 
current controller.

1. Broker 1 loses its session in Zookeeper.  

2. Broker 2 becomes the new controller.

3. During initialization, controller 2 removes 1 from the ISR. So state is 
updated: leader=2, isr=[1, 2], leader epoch=11.

4. Broker 2 receives `LeaderAndIsr` from the new controller with leader 
epoch=11.

5. Broker 2 immediately tries to add replica 1 back to the ISR since it is 
still fetching and is caught up. However, the 
`BrokerToControllerChannelManager` is still pointed at controller 1, so that is 
where the `AlterPartition` is sent.

6. Controller 1 does not yet realize that it is not the controller, so it 
processes the `AlterPartition` request. It sees the leader epoch of 11, which 
is higher than what it has in its own context. Following changes to the 
`AlterPartition` validation in 
[https://github.com/apache/kafka/pull/12032/files,] the controller returns 
FENCED_LEADER_EPOCH.

7. After receiving the FENCED_LEADER_EPOCH from the old controller, the leader 
is stuck because it assumes that the error implies that another LeaderAndIsr 
request should be sent.

Prior to 
[https://github.com/apache/kafka/pull/12032/files|https://github.com/apache/kafka/pull/12032/files,],
 the way we handled this case was a little different. We only verified that the 
leader epoch in the request was at least as large as the current epoch in the 
controller context. Anything higher was accepted. The controller would have 
attempted to write the updated state to Zookeeper. This update would have 
failed because of the controller epoch check, however, we would have returned 
NOT_CONTROLLER in this case, which is handled in `AlterPartitionManager`.

It is tempting to revert the logic, but the risk is in the idempotency check: 
[https://github.com/apache/kafka/pull/12032/files#diff-3e042c962e80577a4cc9bbcccf0950651c6b312097a86164af50003c00c50d37L2369.]
 If the AlterPartition request happened to match the state inside the old 
controller, the controller would consider the update successful and return no 
error. But if its state was already stale at that point, then that might cause 
the leader to incorrectly assume that the state had been updated.

One way to fix this problem without weakening the validation is to rely on the 
controller epoch in `AlterPartitionManager`. When we discover a new controller, 
we also discover its epoch, so we can pass that through. The `LeaderAndIsr` 
request already includes the controller epoch of the controller that sent it 
and we already propagate this through to `AlterPartition.submit`. Hence all we 
need to do is verify that the epoch of the current controller target is at 
least as large as the one discovered through the `LeaderAndIsr`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ahuang98 commented on a diff in pull request #12479: MINOR; Convert some integration tests to run with the KRaft modes

2022-08-09 Thread GitBox


ahuang98 commented on code in PR #12479:
URL: https://github.com/apache/kafka/pull/12479#discussion_r941879338


##
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##
@@ -20,23 +20,37 @@ import java.util
 import java.util.concurrent.ExecutionException
 import java.util.{Collections, Optional, Properties}
 
-import scala.collection.Seq
-import kafka.log.UnifiedLog
-import kafka.zk.TopicPartitionZNode
-import kafka.utils.TestUtils
-import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, Test}
 import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.controller.{OfflineReplica, PartitionAndReplica, 
ReplicaAssignment, ReplicaDeletionSuccessful}
+import kafka.log.UnifiedLog
+import kafka.server
+import kafka.server.{KafkaBroker, KafkaConfig, KafkaServer, QuorumTestHarness}
+import kafka.utils.Implicits.PropertiesOps
+import kafka.utils.{TestInfoUtils, TestUtils}
+import kafka.zk.TopicPartitionZNode
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
NewPartitionReassignment, NewPartitions}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo}
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
+import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.jupiter.api.Assertions.{assertTrue, _}
+import org.junit.jupiter.api.function.Executable
+import org.junit.jupiter.api.{AfterEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+import org.opentest4j.AssertionFailedError
+
+import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class DeleteTopicTest extends QuorumTestHarness {
 
-  var servers: Seq[KafkaServer] = Seq()
+  var brokers: Seq[KafkaBroker] = Seq()

Review Comment:
   I thought it would be better to bring up brokers manually for each test 
since many require different sets of configs (KRaft doesn't yet support dynamic 
reconfiguration) and different number of brokers (we _could_ just always bring 
up the max required).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #12498: KAFKA-13986; Brokers should include node.id in fetches to metadata quorum

2022-08-09 Thread GitBox


hachikuji commented on PR #12498:
URL: https://github.com/apache/kafka/pull/12498#issuecomment-1209985247

   cc @dengziming @niket-goel @jsancio 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji opened a new pull request, #12498: KAFKA-13986; Brokers should include node.id in fetches to metadata quorum

2022-08-09 Thread GitBox


hachikuji opened a new pull request, #12498:
URL: https://github.com/apache/kafka/pull/12498

   Currently we do not set the replicaId in fetches from brokers to the 
metadata quorum. It is useful to do so since that allows us to debug 
replication using the `DescribeQuorum` API.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12458: MINOR: Adds KRaft versions of most streams system tests

2022-08-09 Thread GitBox


guozhangwang commented on PR #12458:
URL: https://github.com/apache/kafka/pull/12458#issuecomment-1209983916

   Re-triggered the jenkins build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12458: MINOR: Adds KRaft versions of most streams system tests

2022-08-09 Thread GitBox


guozhangwang commented on code in PR #12458:
URL: https://github.com/apache/kafka/pull/12458#discussion_r941876941


##
tests/kafkatest/tests/streams/streams_broker_bounce_test.py:
##
@@ -205,11 +212,17 @@ def collect_results(self, sleep_time_secs):
 return data
 
 @cluster(num_nodes=7)
+@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
+broker_type=["leader"],
+num_threads=[1, 3],
+sleep_time_secs=[120],
+metadata_quorum=[quorum.remote_kraft])

Review Comment:
   Cool, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14153) UnknownTopicOrPartitionException should include the topic/partition in the returned exception message

2022-08-09 Thread Alyssa Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alyssa Huang updated KAFKA-14153:
-
Description: 
Exception would be more useful if it included the topic or partition that was 
not found. Message right now is just 
`This server does not host this topic-partition.`
Background: [https://github.com/apache/kafka/pull/12479#discussion_r938988993]

  was:
Exception would be more useful if it included the topic or partition that was 
not found.
Background: https://github.com/apache/kafka/pull/12479#discussion_r938988993


> UnknownTopicOrPartitionException should include the topic/partition in the 
> returned exception message
> -
>
> Key: KAFKA-14153
> URL: https://issues.apache.org/jira/browse/KAFKA-14153
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Alyssa Huang
>Priority: Minor
>
> Exception would be more useful if it included the topic or partition that was 
> not found. Message right now is just 
> `This server does not host this topic-partition.`
> Background: [https://github.com/apache/kafka/pull/12479#discussion_r938988993]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

2022-08-09 Thread GitBox


jolshan commented on PR #12487:
URL: https://github.com/apache/kafka/pull/12487#issuecomment-1209962842

   Failed tests passed locally:
   
   [Build / JDK 8 and Scala 2.12 / 
kafka.admin.DeleteOffsetsConsumerGroupCommandIntegrationTest.testDeleteOffsetsNonExistingGroup()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12487/6/testReport/junit/kafka.admin/DeleteOffsetsConsumerGroupCommandIntegrationTest/Build___JDK_8_and_Scala_2_12___testDeleteOffsetsNonExistingGroup__/)
   [Build / JDK 8 and Scala 2.12 / 
kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12487/6/testReport/junit/kafka.server/KRaftClusterTest/Build___JDK_8_and_Scala_2_12___testCreateClusterAndPerformReassignment__/)
   [Build / JDK 11 and Scala 2.13 / 
kafka.log.LogCleanerIntegrationTest.testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12487/6/testReport/junit/kafka.log/LogCleanerIntegrationTest/Build___JDK_11_and_Scala_2_13___testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics__/)
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12487/6/testReport/junit/org.apache.kafka.connect.integration/ConnectorRestartApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testMultiWorkerRestartOnlyConnector/)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #12448: KAFKA-14114: Adding Metadata Log Processing Error Related Metrics

2022-08-09 Thread GitBox


cmccabe commented on PR #12448:
URL: https://github.com/apache/kafka/pull/12448#issuecomment-1209956699

   Merged from command line with the changes we discussed. Thanks, @niket-goel 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe closed pull request #12496: KAFKA-14114: Add Metadata Error Related Metrics

2022-08-09 Thread GitBox


cmccabe closed pull request #12496: KAFKA-14114: Add Metadata Error Related 
Metrics
URL: https://github.com/apache/kafka/pull/12496


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14153) UnknownTopicOrPartitionException should include the topic/partition in the returned exception message

2022-08-09 Thread Alyssa Huang (Jira)
Alyssa Huang created KAFKA-14153:


 Summary: UnknownTopicOrPartitionException should include the 
topic/partition in the returned exception message
 Key: KAFKA-14153
 URL: https://issues.apache.org/jira/browse/KAFKA-14153
 Project: Kafka
  Issue Type: Improvement
Reporter: Alyssa Huang


Exception would be more useful if it included the topic or partition that was 
not found.
Background: https://github.com/apache/kafka/pull/12479#discussion_r938988993



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ahuang98 commented on a diff in pull request #12479: MINOR; Convert some integration tests to run with the KRaft modes

2022-08-09 Thread GitBox


ahuang98 commented on code in PR #12479:
URL: https://github.com/apache/kafka/pull/12479#discussion_r941848002


##
core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala:
##
@@ -37,29 +43,57 @@ class RackAwareAutoTopicCreationTest extends 
KafkaServerTestHarness with RackAwa
 
   def generateConfigs =
 (0 until numServers) map { node =>
-  TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = 
false, rack = Some((node / 2).toString))
+  TestUtils.createBrokerConfig(node, zkConnectOrNull, 
enableControlledShutdown = false, rack = Some((node / 2).toString))
 } map (KafkaConfig.fromProps(_, overridingProps))
 
   private val topic = "topic"
 
-  @Test
-  def testAutoCreateTopic(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAutoCreateTopic(quorum: String): Unit = {
 val producer = TestUtils.createProducer(bootstrapServers())
+val props = new Properties()
+props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
+val adminClient = Admin.create(props)
+
+TestUtils.waitUntilTrue(
+  () => brokers.head.metadataCache.getAliveBrokers().size == numServers,
+  "Timed out waiting for all brokers to become unfenced")
+
 try {
   // Send a message to auto-create the topic
   val record = new ProducerRecord(topic, null, "key".getBytes, 
"value".getBytes)
   assertEquals(0L, producer.send(record).get.offset, "Should have offset 
0")
 
-  // double check that the topic is created with leader elected
-  TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-  val assignment = zkClient.getReplicaAssignmentForTopics(Set(topic)).map 
{ case (topicPartition, replicas) =>
-topicPartition.partition -> replicas
+  val partition = 
adminClient.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic).get().
+partitions().stream().filter(_.partition == 0).findAny()
+  assertTrue(partition.isPresent, "Partition [topic,0] should exist")
+  assertFalse(partition.get().leader().isEmpty, "Leader should exist for 
partition [topic,0]")
+
+  val assignment = 
adminClient.describeTopics(Collections.singleton(topic)).topicNameValues.asScala.map
 {
+case (topicName, topicDescriptionFuture) =>
+  try topicName -> topicDescriptionFuture.get
+  catch {
+case t: ExecutionException if 
t.getCause.isInstanceOf[UnknownTopicOrPartitionException] =>
+  throw new ExecutionException(
+new UnknownTopicOrPartitionException(s"Topic $topicName not 
found."))

Review Comment:
   https://issues.apache.org/jira/browse/KAFKA-14153



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14069) Allow custom configuration of foreign key join internal topics

2022-08-09 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17577634#comment-17577634
 ] 

Guozhang Wang commented on KAFKA-14069:
---

That's interesting.. could you check a few things additionally?

1) does your app commit successfully from time to time (the delete records 
request should be sent alined with the commit).
2) did you see the following log-lines in your logs?

```
"Sent delete-records request: {}"
```

```
"Previous delete-records request has failed: {}. Try sending the new request 
now"
```

> Allow custom configuration of foreign key join internal topics
> --
>
> Key: KAFKA-14069
> URL: https://issues.apache.org/jira/browse/KAFKA-14069
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Emmanuel Brard
>Priority: Minor
>
> Internal topic supporting foreign key joins (-subscription-registration-topic 
> and -subscription-response-topic) are automatically created with_ infinite 
> retention_ (retention.ms=-1, retention.bytes=-1).
> As far as I understand those topics are used for communication between tasks 
> that are involved in the FK, the intermediate result though is persisted in a 
> compacted topic (-subscription-store-changelog).
> This means, if I understood right, that during normal operation of the stream 
> application, once a message is read from the registration/subscription topic, 
> it will not be read again, even in case of recovery (the position in those 
> topics is committed).
> Because we have very large tables being joined this way with very high 
> changes frequency, we end up with FK internal topics in the order of 1 or 2 
> TB. This is complicated to maintain especially in term of disk space.
> I was wondering if:
> - this infinite retention is really a required configuration and if not
> - this infinite retention could be replaced with a configurable one (for 
> example of 1 week, meaning that I accept that in case of failure I must this 
> my app within one week)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14070) Improve documentation for queryMetadataForKey for state stores with Processor API

2022-08-09 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17577633#comment-17577633
 ] 

Guozhang Wang commented on KAFKA-14070:
---

Hello [~balajirrao] Thanks for the updated description! That's much clearer 
now. Yes the `queryMetadataForKey` overloaded func without the 
{{StreamPartitioner}} does not work very well with the processor API since it's 
assuming the key of the store is inherited from the key of the input (or 
repartition) topic. If user is storing the key in a different manner, they'd 
need to use the other overloaded func that requires {{StreamPartitioner}}. But 
like you brought up in the second example, if the partitioning scheme is not 
dependent by the key (e.g. if it's by the value) then that function does not 
help either. I think in the near term it's definitely necessary to improve our 
docs clarifying that for the PAPI users --- would you like to file a PR?

In the long run, we should consider generalizing this function to allow users 
provide any form of partitioning schemes.

> Improve documentation for queryMetadataForKey for state stores with Processor 
> API
> -
>
> Key: KAFKA-14070
> URL: https://issues.apache.org/jira/browse/KAFKA-14070
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Balaji Rao
>Priority: Minor
>
> Using {{queryMetadataForKey}} for state stores with Processor API is tricky. 
> One could use state stores in Processor API in ways that would make it 
> impossible to use {{queryMetadataForKey}} with just a key alone - one would 
> have to know the input record's key. This could lead to the method being 
> called with incorrect expectations. The documentation could be improved 
> around this, and around using state stores with the Processor API in general.
> Example Scala snippet:
> {code:scala}
> val input = streamsBuilder.stream(
> "input-topic",
> Consumed.`with`(Serdes.intSerde, Serdes.stringSerde)
>   )
>   private val storeBuilder = Stores
> .keyValueStoreBuilder[String, String](
>   Stores.inMemoryKeyValueStore("store"),
>   Serdes.stringSerde,
>   Serdes.stringSerde
> )
>   streamsBuilder.addStateStore(storeBuilder)
>   input.process(
> new ProcessorSupplier[Int, String, Void, Void] {
>   override def get(): Processor[Int, String, Void, Void] =
> new Processor[Int, String, Void, Void] {
>   var store: KeyValueStore[String, String] = _
>   override def init(context: ProcessorContext[Void, Void]): Unit = {
> super.init(context)
> store = context.getStateStore("store")
>   }
>   override def process(record: Record[Int, String]): Unit = {
> ('a' to 'j').foreach(x =>
>   store.put(s"${record.key}-$x", record.value)
> )
>   }
> }
> },
> "store"
>   )
> {code}
> In the code sample above, AFAICT there is no way the possible partition of 
> the {{store}} containing the key {{"1-a"}} could be determined by calling 
> {{queryMetadataForKey}} with the string {{{}"1-a"{}}}. One has to call 
> {{queryMetadataForKey}} with the record's key that produced {{{}"1-a"{}}}, in 
> this case the {{Int}} 1, to find the partition.
>  
> Example 2:
> The same as above, but with a different {{process}} method.
> {code:scala}
> override def process(record: Record[Int, String]): Unit = {
>   ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}"))
> }{code}
> In this case the key {{"a"}} could exist in multiple partitions, with 
> different values in different partitions. In this case, AFAICT, one must use 
> {{queryMetadataForKey}} with an {{Int}} to determine the partition where a 
> given {{String}} would be stored.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14152) Add logic to fence kraft brokers which have fallen behind in replication

2022-08-09 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14152:
---

 Summary: Add logic to fence kraft brokers which have fallen behind 
in replication
 Key: KAFKA-14152
 URL: https://issues.apache.org/jira/browse/KAFKA-14152
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


When a kraft broker registers with the controller, it must catch up to the 
current metadata before it is unfenced. However, once it has been unfenced, it 
only needs to continue sending heartbeats to remain unfenced. It can fall 
arbitrarily behind in the replication of the metadata log and remain unfenced. 
We should consider whether there is an inverse condition that we can use to 
fence a broker that has fallen behind.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ahuang98 commented on a diff in pull request #12479: MINOR; Convert some integration tests to run with the KRaft modes

2022-08-09 Thread GitBox


ahuang98 commented on code in PR #12479:
URL: https://github.com/apache/kafka/pull/12479#discussion_r941771144


##
core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala:
##
@@ -37,29 +43,57 @@ class RackAwareAutoTopicCreationTest extends 
KafkaServerTestHarness with RackAwa
 
   def generateConfigs =
 (0 until numServers) map { node =>
-  TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = 
false, rack = Some((node / 2).toString))
+  TestUtils.createBrokerConfig(node, zkConnectOrNull, 
enableControlledShutdown = false, rack = Some((node / 2).toString))
 } map (KafkaConfig.fromProps(_, overridingProps))
 
   private val topic = "topic"
 
-  @Test
-  def testAutoCreateTopic(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAutoCreateTopic(quorum: String): Unit = {
 val producer = TestUtils.createProducer(bootstrapServers())
+val props = new Properties()
+props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
+val adminClient = Admin.create(props)
+
+TestUtils.waitUntilTrue(
+  () => brokers.head.metadataCache.getAliveBrokers().size == numServers,
+  "Timed out waiting for all brokers to become unfenced")
+
 try {
   // Send a message to auto-create the topic
   val record = new ProducerRecord(topic, null, "key".getBytes, 
"value".getBytes)
   assertEquals(0L, producer.send(record).get.offset, "Should have offset 
0")
 
-  // double check that the topic is created with leader elected
-  TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-  val assignment = zkClient.getReplicaAssignmentForTopics(Set(topic)).map 
{ case (topicPartition, replicas) =>
-topicPartition.partition -> replicas
+  val partition = 
adminClient.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic).get().
+partitions().stream().filter(_.partition == 0).findAny()
+  assertTrue(partition.isPresent, "Partition [topic,0] should exist")
+  assertFalse(partition.get().leader().isEmpty, "Leader should exist for 
partition [topic,0]")
+
+  val assignment = 
adminClient.describeTopics(Collections.singleton(topic)).topicNameValues.asScala.map
 {
+case (topicName, topicDescriptionFuture) =>
+  try topicName -> topicDescriptionFuture.get
+  catch {
+case t: ExecutionException if 
t.getCause.isInstanceOf[UnknownTopicOrPartitionException] =>
+  throw new ExecutionException(
+new UnknownTopicOrPartitionException(s"Topic $topicName not 
found."))
+  }
+  }.flatMap {
+case (_, topicDescription) => topicDescription.partitions.asScala.map 
{ info =>
+  (info.partition, info.replicas.asScala.map(_.id))
+}
   }
-  val brokerMetadatas = 
adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced)
+
+  val brokerMetadatas = brokers.head.metadataCache.getAliveBrokers().toList
   val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1")
   assertEquals(expectedMap, brokerMetadatas.map(b => b.id -> 
b.rack.get).toMap)
-  checkReplicaDistribution(assignment, expectedMap, numServers, 
numPartitions, replicationFactor)
-} finally producer.close()
+  checkReplicaDistribution(assignment, expectedMap, numServers, 
numPartitions, replicationFactor,
+verifyLeaderDistribution = false)

Review Comment:
   Still investigating this one, I believe @rondagostino is taking a look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #12486: MINOR: BrokerMetadataSnapshotter must avoid exceeding batch size

2022-08-09 Thread GitBox


cmccabe merged PR #12486:
URL: https://github.com/apache/kafka/pull/12486


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #12494: MINOR: Update site docs for ASF compliance

2022-08-09 Thread GitBox


bbejeck commented on PR #12494:
URL: https://github.com/apache/kafka/pull/12494#issuecomment-1209815424

   > Should we fix the issues I reported in [apache/kafka-site#433 
(comment)](https://github.com/apache/kafka-site/pull/433#issuecomment-1205049698)
 before merging the images back to Kafka?
   
   I've taken a stab at addressing those comments - this may be the best I can 
do


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14138) The Exception Throwing Behavior of Transactional Producer is Inconsistent

2022-08-09 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17577552#comment-17577552
 ] 

Sagar Rao commented on KAFKA-14138:
---

Thanks [~guozhang] . I went through KIP-691 and got some ideas around the 
expectation. Having said that, looks like the KIP isn't implemented yet and 
some of the new exceptions proposed aren't part of the codebase. 

> The Exception Throwing Behavior of Transactional Producer is Inconsistent
> -
>
> Key: KAFKA-14138
> URL: https://issues.apache.org/jira/browse/KAFKA-14138
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Critical
>
> There's an issue for inconsistent error throwing inside Kafka Producer when 
> transactions are enabled. In short, there are two places where the received 
> error code from the brokers would be eventually thrown to the caller:
> * Recorded on the batch's metadata, via "Sender#failBatch"
> * Recorded on the txn manager, via "txnManager#handleFailedBatch".
> The former would be thrown from 1) the `Future` returned from 
> the `send`; or 2) the `callback` inside `send(record, callback)`. Whereas, 
> the latter would be thrown from `producer.send()` directly in which we call 
> `txnManager.maybeAddPartition -> maybeFailWithError`. However, when thrown 
> from the former, it's not wrapped hence the direct exception (e.g. 
> ClusterAuthorizationException), whereas in the latter it's wrapped as, e.g. 
> KafkaException(ClusterAuthorizationException). And which one would be thrown 
> depend on a race condition since we cannot control by the time the caller 
> thread calls `txnManager.maybeAddPartition`, if the previous produceRequest's 
> error has been sent back or not.
> For example consider the following sequence for idempotent producer:
> 1. caller thread: within future = producer.send(), call 
> recordAccumulator.append
> 2. sender thread: drain the accumulator, send the produceRequest and get the 
> error back.
> 3. caller thread: within future = producer.send(), call 
> txnManager.maybeAddPartition, in which we would check `maybeFailWithError` 
> before `isTransactional`.
> 4. caller thread: future.get()
> In a sequence where then 3) happened before 2), we would only get the raw 
> exception at step 4; in a sequence where 2) happened before 3), then we would 
> throw the exception immediately at 3).
> This inconsistent error throwing is pretty annoying for users since they'd 
> need to handle both cases, but many of them actually do not know this 
> trickiness. We should make the error throwing consistent, e.g. we should 
> consider: 1) which errors would be thrown from callback / future.get, and 
> which would be thrown from the `send` call directly, and these errors should 
> better be non-overlapping, 2) whether we should wrap the raw error or not, we 
> should do so consistently.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14151) Add validation to fail fast when base offsets are incorrectly assigned to batches

2022-08-09 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-14151:
-
Summary: Add validation to fail fast when base offsets are incorrectly 
assigned to batches  (was: Add additional validation to protect on-disk log 
segment data from being corrupted)

> Add validation to fail fast when base offsets are incorrectly assigned to 
> batches
> -
>
> Key: KAFKA-14151
> URL: https://issues.apache.org/jira/browse/KAFKA-14151
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: Vincent Jiang
>Priority: Major
>
> We saw a case where records with incorrect offsets were being written to log 
> segment on-disk data due to environmental issues (bug in old version JVM 
> JIT). We should consider adding additional validation to detect this scenario 
> and fail fast.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14151) Add additional validation to protect on-disk log segment data from being corrupted

2022-08-09 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-14151:
-
Description: We saw a case where records with incorrect offsets were being 
written to log segment on-disk data due to environmental issues (bug in old 
version JVM JIT). We should consider adding additional validation to detect 
this scenario and fail fast.  (was: We received escalations reporting bad 
records being written to log segment on-disk data due to environmental issues 
(bug in old version JVM jit).  We should consider adding additional validation 
to  protect the on-disk data from being corrupted by inadvertent bugs or 
environmental issues)

> Add additional validation to protect on-disk log segment data from being 
> corrupted
> --
>
> Key: KAFKA-14151
> URL: https://issues.apache.org/jira/browse/KAFKA-14151
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: Vincent Jiang
>Priority: Major
>
> We saw a case where records with incorrect offsets were being written to log 
> segment on-disk data due to environmental issues (bug in old version JVM 
> JIT). We should consider adding additional validation to detect this scenario 
> and fail fast.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14151) Add additional validation to protect on-disk log segment data from being corrupted

2022-08-09 Thread Vincent Jiang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vincent Jiang updated KAFKA-14151:
--
Priority: Major  (was: Minor)

> Add additional validation to protect on-disk log segment data from being 
> corrupted
> --
>
> Key: KAFKA-14151
> URL: https://issues.apache.org/jira/browse/KAFKA-14151
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: Vincent Jiang
>Priority: Major
>
> We received escalations reporting bad records being written to log segment 
> on-disk data due to environmental issues (bug in old version JVM jit).  We 
> should consider adding additional validation to  protect the on-disk data 
> from being corrupted by inadvertent bugs or environmental issues



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14151) Add additional validation to protect on-disk log segment data from being corrupted

2022-08-09 Thread Vincent Jiang (Jira)
Vincent Jiang created KAFKA-14151:
-

 Summary: Add additional validation to protect on-disk log segment 
data from being corrupted
 Key: KAFKA-14151
 URL: https://issues.apache.org/jira/browse/KAFKA-14151
 Project: Kafka
  Issue Type: Improvement
  Components: log
Reporter: Vincent Jiang


We received escalations reporting bad records being written to log segment 
on-disk data due to environmental issues (bug in old version JVM jit).  We 
should consider adding additional validation to  protect the on-disk data from 
being corrupted by inadvertent bugs or environmental issues



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14151) Add additional validation to protect on-disk log segment data from being corrupted

2022-08-09 Thread Vincent Jiang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vincent Jiang updated KAFKA-14151:
--
Priority: Minor  (was: Major)

> Add additional validation to protect on-disk log segment data from being 
> corrupted
> --
>
> Key: KAFKA-14151
> URL: https://issues.apache.org/jira/browse/KAFKA-14151
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: Vincent Jiang
>Priority: Minor
>
> We received escalations reporting bad records being written to log segment 
> on-disk data due to environmental issues (bug in old version JVM jit).  We 
> should consider adding additional validation to  protect the on-disk data 
> from being corrupted by inadvertent bugs or environmental issues



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

2022-08-09 Thread GitBox


hachikuji commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r941604866


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -881,11 +881,16 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
 metadataCache match {
   // In KRaft mode, only replicas which are not fenced nor in controlled 
shutdown are
-  // allowed to join the ISR. This does not apply to ZK mode.
+  // allowed to join the ISR.
   case kRaftMetadataCache: KRaftMetadataCache =>
 !kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
   !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId)
 
+  // In ZK mode, we just ensure the broker is alive. Although we do not 
check for shutting down brokers here,'

Review Comment:
   nit: unintentional apostrophe at the end?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

2022-08-09 Thread GitBox


jolshan commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r941598423


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
 metadataCache match {
   // In KRaft mode, only replicas which are not fenced nor in controlled 
shutdown are
-  // allowed to join the ISR. This does not apply to ZK mode.
+  // allowed to join the ISR. In ZK mode, we just ensure the broker is 
alive and not shutting down.

Review Comment:
   Ah my comment is slightly different in the latest commit. Let me know if I 
should change it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12469: KAFKA-13914: Add command line tool kafka-metadata-quorum.sh

2022-08-09 Thread GitBox


hachikuji commented on code in PR #12469:
URL: https://github.com/apache/kafka/pull/12469#discussion_r941548893


##
core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala:
##
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.tools.TerseFailure
+import kafka.utils.Exit
+import net.sourceforge.argparse4j.ArgumentParsers
+import net.sourceforge.argparse4j.impl.Arguments.fileType
+import net.sourceforge.argparse4j.inf.Subparsers
+import org.apache.kafka.clients._
+import org.apache.kafka.clients.admin.{Admin, QuorumInfo}
+import org.apache.kafka.common.utils.Utils
+
+import java.io.File
+import java.util.Properties
+import scala.jdk.CollectionConverters._
+
+/**
+ * A tool for describing quorum status
+ */
+object MetadataQuorumCommand {
+
+  def main(args: Array[String]): Unit = {
+val res = mainNoExit(args)
+Exit.exit(res)
+  }
+
+  def mainNoExit(args: Array[String]): Int = {
+val parser = ArgumentParsers.newArgumentParser("kafka-metadata-quorum")
+  .defaultHelp(true)
+  .description("This tool describes kraft metadata quorum status.")
+parser.addArgument("--bootstrap-server")
+  .help("A comma-separated list of host:port pairs to use for establishing 
the connection to the Kafka cluster.")
+  .required(true)
+
+parser.addArgument("--command-config")
+  .`type`(fileType())
+  .help("Property file containing configs to be passed to Admin Client.")
+val subparsers = parser.addSubparsers().dest("command")
+addDescribeParser(subparsers)
+
+try {
+  val namespace = parser.parseArgsOrFail(args)
+  val command = namespace.getString("command")
+
+  val commandConfig = namespace.get[File]("command_config")
+  val props = if (commandConfig != null) {
+if (!commandConfig.exists()) {
+  throw new TerseFailure(s"Properties file ${commandConfig.getPath} 
does not exists!")
+}
+Utils.loadProps(commandConfig.getPath)
+  } else {
+new Properties()
+  }
+  props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
namespace.getString("bootstrap_server"))
+  val admin = Admin.create(props)
+
+  if (command == "describe") {
+handleDescribe(admin)
+  } else {
+// currently we only support describe
+  }
+  admin.close()
+  0
+} catch {
+  case e: TerseFailure =>
+Console.err.println(e.getMessage)
+1
+}
+  }
+
+  def addDescribeParser(subparsers: Subparsers): Unit = {
+subparsers.addParser("describe")
+  .help("Describe the metadata quorum info")
+  }
+
+  def handleDescribe(admin: Admin): Unit = {

Review Comment:
   The output seems a bit different from what was documented in KIP-595 (and 
KIP-836):
   - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-ToolingSupport.
 
   - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-836:+Addition+of+Information+in+DescribeQuorumResponse+about+Voter+Lag
   
   Any reason to change it?



##
core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala:
##
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.tools.TerseFailure
+import kafka.utils.Exit
+import net.sourceforge.argparse4j.ArgumentParsers
+import 

[GitHub] [kafka] jolshan commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

2022-08-09 Thread GitBox


jolshan commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r941543760


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
 metadataCache match {
   // In KRaft mode, only replicas which are not fenced nor in controlled 
shutdown are
-  // allowed to join the ISR. This does not apply to ZK mode.
+  // allowed to join the ISR. In ZK mode, we just ensure the broker is 
alive and not shutting down.

Review Comment:
   Ok makes sense. Should I change the comment to reflect that this will not 
block shutting down brokers here, but will be blocked controller side?
   
   I think for at least this PR (which we want to get into 3.3) we should hold 
off on protocol changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

2022-08-09 Thread GitBox


jolshan commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r941537699


##
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##
@@ -1045,6 +1049,78 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
 assertEquals(expectedAlterPartitionResponse, future.get(10, 
TimeUnit.SECONDS))
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testAlterPartitionVersionSource"))

Review Comment:
   thanks! I was trying to remember how to do this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14144) AlterPartition is not idempotent when requests time out

2022-08-09 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-14144.
-
Resolution: Fixed

> AlterPartition is not idempotent when requests time out
> ---
>
> Key: KAFKA-14144
> URL: https://issues.apache.org/jira/browse/KAFKA-14144
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Blocker
> Fix For: 3.3.0
>
>
> [https://github.com/apache/kafka/pull/12032] changed the validation order of 
> AlterPartition requests to fence requests with a stale partition epoch before 
> we compare the leader and ISR contents.
> This results in a loss of idempotency if a leader does not receive an 
> AlterPartition response because retries will receive an 
> INVALID_UPDATE_VERSION error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji merged pull request #12489: KAFKA-14144: Compare AlterPartition LeaderAndIsr before fencing partition epoch

2022-08-09 Thread GitBox


hachikuji merged PR #12489:
URL: https://github.com/apache/kafka/pull/12489


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14134) Replace EasyMock with Mockito for WorkerConnectorTest

2022-08-09 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-14134:
---
Fix Version/s: 3.4.0

> Replace EasyMock with Mockito for WorkerConnectorTest
> -
>
> Key: KAFKA-14134
> URL: https://issues.apache.org/jira/browse/KAFKA-14134
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Priority: Minor
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] yashmayya commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

2022-08-09 Thread GitBox


yashmayya commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r941408595


##
docs/connect.html:
##
@@ -440,19 +443,17 @@ Connector
 
 @Override
 public ListMapString, String taskConfigs(int maxTasks) {
+// This method is where connectors provide the task configs for the tasks 
that are to be created for this connector.
+// The length of the list determines the number of tasks that need to be 
created. The FileStreamSourceConnector, however, is
+// only capable of spinning up a single task (since there isn't work that 
can be distributed among multiple tasks).
+// Note that the task configs could contain configs additional to or 
different from the connector configs if needed (for instance,
+// if different tasks have different responsibilities, or if different 
tasks are meant to process different subsets of the source data stream).

Review Comment:
   Ah, fair point. I've trimmed it down to couple of sentences. 



##
docs/connect.html:
##
@@ -440,19 +443,17 @@ Connector
 
 @Override
 public ListMapString, String taskConfigs(int maxTasks) {
+// This method is where connectors provide the task configs for the tasks 
that are to be created for this connector.
+// The length of the list determines the number of tasks that need to be 
created. The FileStreamSourceConnector, however, is
+// only capable of spinning up a single task (since there isn't work that 
can be distributed among multiple tasks).
+// Note that the task configs could contain configs additional to or 
different from the connector configs if needed (for instance,
+// if different tasks have different responsibilities, or if different 
tasks are meant to process different subsets of the source data stream).

Review Comment:
   Ah, fair point. I've trimmed it down to a couple of sentences. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12490: KAFKA-14147: Prevent deferredTaskUpdates map from growing monotonically in KafkaConfigBackingStore

2022-08-09 Thread GitBox


C0urante commented on code in PR #12490:
URL: https://github.com/apache/kafka/pull/12490#discussion_r941405027


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -853,6 +853,9 @@ private void processConnectorConfigRecord(String 
connectorName, SchemaAndValue v
 connectorConfigs.remove(connectorName);
 connectorTaskCounts.remove(connectorName);
 taskConfigs.keySet().removeIf(taskId -> 
taskId.connector().equals(connectorName));
+deferredTaskUpdates.remove(connectorName);
+connectorTaskCountRecords.remove(connectorName);

Review Comment:
   > Tbh, it doesn't really seem like it's worth the mess of null handling 
everywhere. I'm gonna back out this change and make this a single line PR 
   
   A single line PR... with tests? 
   
   Your understanding is correct--we only track task generations in memory, 
different herders may have different generations for the same set of task 
configs, and we use generations to abort task startup after initializing their 
transactional producer and to abort persisting zombie fencing records to the 
config topic.
   
   The reason this is all fine is that we don't really need to track an exact 
generation number; all we have to do is track whether a new set of task configs 
for a given connector has appeared after a specific set of task configs. 
Compaction should not change the fact that, once we have a generation number 
for a set of task configs, generation numbers for later task configs will be 
greater than that number.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #12472: KAFKA-14134: Replace EasyMock with Mockito for WorkerConnectorTest

2022-08-09 Thread GitBox


C0urante merged PR #12472:
URL: https://github.com/apache/kafka/pull/12472


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12472: KAFKA-14134: Replace EasyMock with Mockito for WorkerConnectorTest

2022-08-09 Thread GitBox


C0urante commented on code in PR #12472:
URL: https://github.com/apache/kafka/pull/12472#discussion_r941388856


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java:
##
@@ -120,43 +105,21 @@ public void testInitializeFailure() {
 workerConnector.doShutdown();
 assertStoppedMetric(workerConnector);
 
-verifyAll();
+verifyCleanInitialize();
+verify(listener).onFailure(CONNECTOR, exception);
+verify(listener).onShutdown(CONNECTOR);

Review Comment:
   Yeah, that's fair. LGTM  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

2022-08-09 Thread GitBox


C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r941371749


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -20,36 +20,35 @@
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports 
both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
  */
 public class FileStreamSourceConnector extends SourceConnector {
+
+private static final Logger log = 
LoggerFactory.getLogger(FileStreamSourceConnector.class);
 public static final String TOPIC_CONFIG = "topic";
 public static final String FILE_CONFIG = "file";
 public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
 public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
 
-private static final ConfigDef CONFIG_DEF = new ConfigDef()
+static final ConfigDef CONFIG_DEF = new ConfigDef()
 .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source 
filename. If not specified, the standard input will be used")
-.define(TOPIC_CONFIG, Type.LIST, Importance.HIGH, "The topic to 
publish data to")
+.define(TOPIC_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new 
ConfigDef.NonEmptyString(), Importance.HIGH, "The topic to publish data to")
 .define(TASK_BATCH_SIZE_CONFIG, Type.INT, DEFAULT_TASK_BATCH_SIZE, 
Importance.LOW,
 "The maximum number of records the Source task can read from 
file one time");

Review Comment:
   ```suggestion
   "The maximum number of records the source task can read from 
the file each time it is polled");
   ```



##
docs/connect.html:
##
@@ -423,9 +422,13 @@ Connector
 
 @Override
 public void start(MapString, String props) {
-// The complete version includes error handling as well.
-filename = props.get(FILE_CONFIG);
-topic = props.get(TOPIC_CONFIG);
+// All initialization logic and setting up of resources goes in this 
method. The FileStreamSourceConnector, however, doesn't need such logic here.

Review Comment:
   ```suggestion
   // Initialization logic and setting up resources can take place in this 
method. This connector doesn't need to do any of that, but we do log a helpful 
message to the user.
   ```



##
docs/connect.html:
##
@@ -440,19 +443,17 @@ Connector
 
 @Override
 public ListMapString, String taskConfigs(int maxTasks) {
+// This method is where connectors provide the task configs for the tasks 
that are to be created for this connector.
+// The length of the list determines the number of tasks that need to be 
created. The FileStreamSourceConnector, however, is
+// only capable of spinning up a single task (since there isn't work that 
can be distributed among multiple tasks).
+// Note that the task configs could contain configs additional to or 
different from the connector configs if needed (for instance,
+// if different tasks have different responsibilities, or if different 
tasks are meant to process different subsets of the source data stream).

Review Comment:
   This is really verbose. Can we simplify? I was hoping we'd be able to spell 
things out here in 1-2 lines.
   
   Keep in mind that the next paragraph provides a lot of useful info already:
   > Even with multiple tasks, this method implementation is usually pretty 
simple. It just has to determine the number of input tasks, which may require 
contacting the remote service it is pulling data from, and then divvy them up. 
Because some patterns for splitting work among tasks are so common, some 
utilities are provided in ConnectorUtils to simplify these cases.



##
docs/connect.html:
##
@@ -609,9 +618,11 @@ Connect Configuration Valida
 The following code in FileStreamSourceConnector defines 
the configuration and exposes it to the framework.
 
 
-private static final ConfigDef CONFIG_DEF = new ConfigDef()
-.define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
-.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish 
data to");
+static final ConfigDef CONFIG_DEF = new ConfigDef()
+.define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. 
If not specified, the standard input will be used")
+.define(TOPIC_CONFIG, Type.STRING, 

[GitHub] [kafka] mimaison commented on pull request #12494: MINOR: Update site docs for ASF compliance

2022-08-09 Thread GitBox


mimaison commented on PR #12494:
URL: https://github.com/apache/kafka/pull/12494#issuecomment-1209402011

   Thanks @bbejeck !
   Should we fix the issues I reported in 
https://github.com/apache/kafka-site/pull/433#issuecomment-1205049698 before 
merging the images back to Kafka?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #12494: MINOR: Update site docs for ASF compliance

2022-08-09 Thread GitBox


bbejeck commented on PR #12494:
URL: https://github.com/apache/kafka/pull/12494#issuecomment-1209389661

   @mimaison added images


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()

2022-08-09 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17573211#comment-17573211
 ] 

Matthew de Detrich edited comment on KAFKA-14014 at 8/9/22 12:58 PM:
-

In case people want to reproduce the flakiness, assuming you have a working 
docker installation you can do the following
{code:java}
docker run -it --cpus=2 --rm -u gradle -v "$PWD":/home/gradle/project -w 
/home/gradle/project gradle sh{code}
where cpus=2 is how you can toggle how many cpus you want (there is a tradeoff 
between higher occurrence to encounter the flakiness vs how fast the test 
runs). I wouldn't recommend doing lower than cpus=2 otherwise even building the 
kafak project in gradle can take ages.

The above command will put you into a shell at which point you can do
{code:java}
while [ $? -eq 0 ]; do ./gradlew :streams:test --tests 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets;
 done{code}
Which will re-run the tests until there is a failure. 

Note that due to how Gradle cache's test runs, you need to do something like 
[https://github.com/gradle/gradle/issues/9151#issue-434212465] or put
{code:java}
test.outputs.upToDateWhen {false}{code}
in order to force gradle to re-run the test every time.


was (Author: mdedetrich-aiven):
In case people want to reproduce the flakiness, assuming you have a working 
docker installation you can do the following
{code:java}
docker run -it --cpus=2 --rm -u gradle -v "$PWD":/home/gradle/project -w 
/home/gradle/project gradle sh{code}
where cpus=2 is how you can toggle how many cpus you want (there is a tradeoff 
between higher occurrence to encounter the flakiness vs how fast the test 
runs). I wouldn't recommend doing lower than cpus=2 otherwise even building the 
kafak project in gradle can take ages.

The above command will put you into a shell at which point you can do
{code:java}
while [ $? -eq 0 ]; do ./gradlew :streams:test --tests 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets;
 done{code}
Which will re-run the tests until there is a failure. 

Note that due to how Gradle cache's test runs, you need to do something like 
[https://github.com/gradle/gradle/issues/9151#issue-434212465] in order to 
force gradle to re-run the test every time.

> Flaky test 
> NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
> 
>
> Key: KAFKA-14014
> URL: https://issues.apache.org/jira/browse/KAFKA-14014
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Matthew de Detrich
>Priority: Critical
>  Labels: flaky-test
>
> {code:java}
> java.lang.AssertionError: 
> Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]>
>  but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>  

[GitHub] [kafka] bbejeck commented on pull request #12494: MINOR: Update site docs for ASF compliance

2022-08-09 Thread GitBox


bbejeck commented on PR #12494:
URL: https://github.com/apache/kafka/pull/12494#issuecomment-1209340844

   > The path of the images is `/{{version}}/images/`, so do we need to add 
them to this repo too?
   
   Yes I overlooked that - will do


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #12485: KAFKA-14131: Adding InterruptException when reading to end of Offseto…

2022-08-09 Thread GitBox


vamossagar12 commented on PR #12485:
URL: https://github.com/apache/kafka/pull/12485#issuecomment-1209328983

   > This should definitely come with a test :)
   > 
   > I'm also not sure this is the best approach, since the 
[ExecutorService::shutdownNow 
Javadocs](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html#shutdownNow())
 don't give us any guarantees about threads being interrupted:
   > 
   > > There are no guarantees beyond best-effort attempts to stop processing 
actively executing tasks. For example, typical implementations will cancel via 
[Thread.interrupt()](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Thread.html#interrupt()),
 so any task that fails to respond to interrupts may never terminate.
   > 
   > And on top of that, by the time the thread is interrupted--if it's 
interrupted at all--we've already exhausted our graceful shutdown timeout.
   > 
   > Can we leverage the existing shutdown logic in `KafkaBasedLog::stop` 
somehow, possibly by accounting for a `WakeupException` being thrown while 
reading to the end of the log during `KafkaBasedLog::start`? I'm not certain 
that it's safe to stop a log while another thread is in the middle of starting 
the log; we may have to tweak some of the logic there. We may also have to wake 
up/interrupt/shut down the admin client (if we're using one to read offsets) 
since that too could potentially block (but perhaps not indefinitely).
   
   Thanks Chris. I agree do need a test. Would figure out how to add one..
   
   I think you brought up a great point. Here's what I understood as per the 
issue. If a DistributedHerder gets a signal to be shutdown in `stop` method, it 
invokes `shutdown` and waits for `graceful shutdown timeout`. 
`awaitTermination` can throw an `InterruptedException` and once that's thrown, 
the KafkaBasedLog remains in an infinite loop since I believe the initial 
readToEnd is off the same thread(switched to a `WorkerThread` later on. That's 
the interrupted exception that I was looking to handle in this PR. Do you think 
that makes sense?
   
   IIUC, the scenario you have described is if even after the graceful 
shutdown, if the Log isn't stopped and the herder never got to get interrupted, 
the potential infinite loop issue still remains. Is that correct?
   
   I think using `KafkaBasedLog::stop` might be a good idea in this case. BTW, 
I see there's something called `stopServices` which is effectively stopping 
these the backing consumers. That inherently calls the  `KafkaBasedLog::stop` 
method and it's all wired using `halt` method which checks upon a `stopping` 
flag which is set in `stop` method (BTW, I know you know all this, just writing 
down for my own confirmation :D ) . Do you think the situation you described 
would still arise or the only unhandled case was the interruption of the herder 
executor. WDYT?
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #12497: KAFKA-10199: Expose read only task from state updater

2022-08-09 Thread GitBox


cadonna opened a new pull request, #12497:
URL: https://github.com/apache/kafka/pull/12497

   The state updater exposes tasks that are in restoration
   to the stream thread. To ensure that the stream thread
   only accesses the tasks to read from the tasks without
   modifying any internal state, this PR introduces a
   read-only task that throws an exception if the caller
   tries to modify the internal state of a task.
   
   This PR also returns read-only tasks from
   DefaultStateUpdater#getTasks().
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2022-08-09 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-14133:
--
Description: 
{color:#DE350B}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#FF8B00}In Review{color}

# WorkerConnectorTest (connect) (owner: Yash)
# WorkerCoordinatorTest (connect)
# RootResourceTest (connect)
# ByteArrayProducerRecordEquals (connect)
# TopologyTest
# {color:#FF8B00}KStreamFlatTransformTest{color} (owner: Christo)
# {color:#FF8B00}KStreamFlatTransformValuesTest{color} (owner: Christo)
# {color:#FF8B00}KStreamPrintTest{color} (owner: Christo)
# {color:#FF8B00}KStreamRepartitionTest{color} (owner: Christo)
# {color:#FF8B00}MaterializedInternalTest{color} (owner: Christo)
# {color:#FF8B00}TransformerSupplierAdapterTest{color} (owner: Christo)
# {color:#FF8B00}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
# {color:#FF8B00}ClientUtilsTest{color} (owner: Christo)
# {color:#FF8B00}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
# KTableSuppressProcessorTest
# InternalTopicManagerTest
# ProcessorContextImplTest
# ProcessorStateManagerTest
# StandbyTaskTest
# StoreChangelogReaderTest
# StreamTaskTest
# StreamThreadTest
# StreamsAssignmentScaleTest (*WIP* owner: Christo)
# StreamsPartitionAssignorTest (*WIP* owner: Christo)
# StreamsRebalanceListenerTest
# TaskManagerTest
# TimestampedKeyValueStoreMaterializerTest
# WriteConsistencyVectorTest
# AssignmentTestUtils (*WIP* owner: Christo)
# StreamsMetricsImplTest
# CachingInMemoryKeyValueStoreTest
# CachingInMemorySessionStoreTest
# CachingPersistentSessionStoreTest
# CachingPersistentWindowStoreTest
# ChangeLoggingKeyValueBytesStoreTest
# ChangeLoggingSessionBytesStoreTest
# ChangeLoggingTimestampedKeyValueBytesStoreTest
# ChangeLoggingTimestampedWindowBytesStoreTest
# ChangeLoggingWindowBytesStoreTest
# CompositeReadOnlyWindowStoreTest
# KeyValueStoreBuilderTest
# MeteredTimestampedWindowStoreTest
# RocksDBStoreTest
# StreamThreadStateStoreProviderTest
# TimeOrderedCachingPersistentWindowStoreTest
# TimeOrderedWindowStoreTest

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#DE350B}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

# WorkerConnectorTest (connect) (owner: Yash)
# WorkerCoordinatorTest (connect)
# RootResourceTest (connect)
# ByteArrayProducerRecordEquals (connect)
# TopologyTest
# {color:#FF8B00}KStreamFlatTransformTest{color} (owner: Christo)
# {color:#FF8B00}KStreamFlatTransformValuesTest{color} (owner: Christo)
# {color:#FF8B00}KStreamPrintTest{color} (owner: Christo)
# {color:#FF8B00}KStreamRepartitionTest{color} (owner: Christo)
# {color:#FF8B00}MaterializedInternalTest{color} (owner: Christo)
# {color:#FF8B00}TransformerSupplierAdapterTest{color} (owner: Christo)
# {color:#FF8B00}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
# {color:#FF8B00}ClientUtilsTest{color} (owner: Christo)
# {color:#FF8B00}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
# KTableSuppressProcessorTest
# InternalTopicManagerTest
# ProcessorContextImplTest
# ProcessorStateManagerTest
# StandbyTaskTest
# StoreChangelogReaderTest
# StreamTaskTest
# StreamThreadTest
# StreamsAssignmentScaleTest (*WIP* owner: Christo)
# StreamsPartitionAssignorTest (*WIP* owner: Christo)
# StreamsRebalanceListenerTest
# TaskManagerTest
# TimestampedKeyValueStoreMaterializerTest
# WriteConsistencyVectorTest
# AssignmentTestUtils (*WIP* owner: Christo)
# StreamsMetricsImplTest
# CachingInMemoryKeyValueStoreTest
# CachingInMemorySessionStoreTest
# CachingPersistentSessionStoreTest
# CachingPersistentWindowStoreTest
# ChangeLoggingKeyValueBytesStoreTest
# ChangeLoggingSessionBytesStoreTest
# ChangeLoggingTimestampedKeyValueBytesStoreTest
# ChangeLoggingTimestampedWindowBytesStoreTest
# ChangeLoggingWindowBytesStoreTest
# CompositeReadOnlyWindowStoreTest
# KeyValueStoreBuilderTest
# MeteredTimestampedWindowStoreTest
# RocksDBStoreTest
# StreamThreadStateStoreProviderTest
# TimeOrderedCachingPersistentWindowStoreTest
# TimeOrderedWindowStoreTest

*The coverage report for the above tests 

[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2022-08-09 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-14133:
--
Description: 
{color:#DE350B}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

# WorkerConnectorTest (connect) (owner: Yash)
# WorkerCoordinatorTest (connect)
# RootResourceTest (connect)
# ByteArrayProducerRecordEquals (connect)
# TopologyTest
# {color:#FF8B00}KStreamFlatTransformTest{color} (owner: Christo)
# {color:#FF8B00}KStreamFlatTransformValuesTest{color} (owner: Christo)
# {color:#FF8B00}KStreamPrintTest{color} (owner: Christo)
# {color:#FF8B00}KStreamRepartitionTest{color} (owner: Christo)
# {color:#FF8B00}MaterializedInternalTest{color} (owner: Christo)
# {color:#FF8B00}TransformerSupplierAdapterTest{color} (owner: Christo)
# {color:#FF8B00}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
# {color:#FF8B00}ClientUtilsTest{color} (owner: Christo)
# {color:#FF8B00}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
# KTableSuppressProcessorTest
# InternalTopicManagerTest
# ProcessorContextImplTest
# ProcessorStateManagerTest
# StandbyTaskTest
# StoreChangelogReaderTest
# StreamTaskTest
# StreamThreadTest
# StreamsAssignmentScaleTest (*WIP* owner: Christo)
# StreamsPartitionAssignorTest (*WIP* owner: Christo)
# StreamsRebalanceListenerTest
# TaskManagerTest
# TimestampedKeyValueStoreMaterializerTest
# WriteConsistencyVectorTest
# AssignmentTestUtils (*WIP* owner: Christo)
# StreamsMetricsImplTest
# CachingInMemoryKeyValueStoreTest
# CachingInMemorySessionStoreTest
# CachingPersistentSessionStoreTest
# CachingPersistentWindowStoreTest
# ChangeLoggingKeyValueBytesStoreTest
# ChangeLoggingSessionBytesStoreTest
# ChangeLoggingTimestampedKeyValueBytesStoreTest
# ChangeLoggingTimestampedWindowBytesStoreTest
# ChangeLoggingWindowBytesStoreTest
# CompositeReadOnlyWindowStoreTest
# KeyValueStoreBuilderTest
# MeteredTimestampedWindowStoreTest
# RocksDBStoreTest
# StreamThreadStateStoreProviderTest
# TimeOrderedCachingPersistentWindowStoreTest
# TimeOrderedWindowStoreTest

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#DE350B}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

# WorkerConnectorTest (connect) (owner: Yash)
# WorkerCoordinatorTest (connect)
# RootResourceTest (connect)
# ByteArrayProducerRecordEquals (connect)
# TopologyTest
# KStreamFlatTransformTest (owner: Christo)
# KStreamFlatTransformValuesTest (owner: Christo)
# KStreamPrintTest (owner: Christo)
# KStreamRepartitionTest (owner: Christo)
# MaterializedInternalTest (owner: Christo)
# TransformerSupplierAdapterTest (owner: Christo)
# KTableSuppressProcessorMetricsTest (owner: Christo)
# KTableSuppressProcessorTest
# ClientUtilsTest (owner: Christo)
# HighAvailabilityStreamsPartitionAssignorTest (owner: Christo)
# InternalTopicManagerTest
# ProcessorContextImplTest
# ProcessorStateManagerTest
# StandbyTaskTest
# StoreChangelogReaderTest
# StreamTaskTest
# StreamThreadTest
# StreamsAssignmentScaleTest (*WIP* owner: Christo)
# StreamsPartitionAssignorTest (*WIP* owner: Christo)
# StreamsRebalanceListenerTest
# TaskManagerTest
# TimestampedKeyValueStoreMaterializerTest
# WriteConsistencyVectorTest
# AssignmentTestUtils (*WIP* owner: Christo)
# StreamsMetricsImplTest
# CachingInMemoryKeyValueStoreTest
# CachingInMemorySessionStoreTest
# CachingPersistentSessionStoreTest
# CachingPersistentWindowStoreTest
# ChangeLoggingKeyValueBytesStoreTest
# ChangeLoggingSessionBytesStoreTest
# ChangeLoggingTimestampedKeyValueBytesStoreTest
# ChangeLoggingTimestampedWindowBytesStoreTest
# ChangeLoggingWindowBytesStoreTest
# CompositeReadOnlyWindowStoreTest
# KeyValueStoreBuilderTest
# MeteredTimestampedWindowStoreTest
# RocksDBStoreTest
# StreamThreadStateStoreProviderTest
# TimeOrderedCachingPersistentWindowStoreTest
# TimeOrderedWindowStoreTest

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*


> Remaining EasyMock to Mockito tests
> ---
>
> Key: KAFKA-14133
> URL: 

[jira] [Created] (KAFKA-14150) Allocation of initial partitions is deterministic and produces a leader bias when a broker is offline

2022-08-09 Thread David Buckley (Jira)
David Buckley created KAFKA-14150:
-

 Summary: Allocation of initial partitions is deterministic and 
produces a leader bias when a broker is offline
 Key: KAFKA-14150
 URL: https://issues.apache.org/jira/browse/KAFKA-14150
 Project: Kafka
  Issue Type: Improvement
Reporter: David Buckley


Observation of our current cluster suggests that with N brokers, the first N 
partitions are always allocated in a round-robin format with a random offset. 
The preferred leader is always the first in a given replica list (and hence is 
allocated round-robin, too). Subsequent brokers are allocated using some 
shuffle on the list, again in a round-robin, which I think is fine and doesn't 
show the bias I detail below. Suppose every topic has as many partitions as 
there are brokers and replication factor of 3. Then every topic has replicas 
{{N, N+1, N+2}} except where this wraps. Example:

* Topic A: 3 partitions, replicas {{012}}, {{120}}, {{201}}, leaders 0, 1, 2
* Topic B: 3 partitions, replicas {{120}}, {{201}}, {{012}}, leaders 1, 2, 0
* Topic C: 3 partitions, replicas {{201}}, {{012}}, {{120}}, leaders 2, 0, 1

This means that if broker {{x}} goes down, every partition that had {{x}} as 
its preferred leader now elects {{x+1}} as its leader -- the leader allocation 
were broker 1 to be offline now looks like:

* Topic A: 3 partitions, replicas {{02}}, {{20}}, {{20}}, leaders 0, 2, 2
* Topic B: 3 partitions, replicas {{20}}, {{20}}, {{02}}, leaders 2, 2, 0
* Topic C: 3 partitions, replicas {{20}}, {{02}}, {{20}}, leaders 2, 0, 2

We see that broker 2 becomes leader of 100% of the failed-over partitions, and 
is now leader of 2x as many partitions as broker 0.

If there were 6 brokers, we'd see that replica sets {{02}}, {{23}} and {{50}} 
would have reduced replication (and broker 4 isn't providing any redundancy for 
partitions replicated in broker 1) in addition to broker 2 leading 2x as many 
partitions as any other broker. Brokers 0 and 2 are now more critical than 3 
and 5, which are in turn more critical than broker 4.

I'm unclear if there's any undesirable side-effects of this, but my expectation 
is that the behaviour isn't really intended because subsequent partitions don't 
just replicate the round-robin of the first batch. Should the allocation of the 
initial partitions be completely random to avoid this bias, or is it 
inconsequential?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] yashmayya commented on a diff in pull request #12472: KAFKA-14134: Replace EasyMock with Mockito for WorkerConnectorTest

2022-08-09 Thread GitBox


yashmayya commented on code in PR #12472:
URL: https://github.com/apache/kafka/pull/12472#discussion_r941154092


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java:
##
@@ -672,6 +452,27 @@ protected void assertInitializedMetric(WorkerConnector 
workerConnector, String e
 assertEquals(VERSION, version);
 }
 
+@SuppressWarnings("unchecked")
+private Callback mockCallback() {
+return mock(Callback.class);
+}
+
+private void verifyCleanInitialize() {
+verify(connector).version();
+if (connector instanceof SourceConnector) {
+verify(offsetStore).start();
+verify(connector).initialize(any(SourceConnectorContext.class));
+} else {
+verify(connector).initialize(any(SinkConnectorContext.class));
+}
+}
+
+private void verifyCleanShutdown() {
+verify(ctx).close();
+verify(offsetStorageReader).close();

Review Comment:
   Good catch, thanks! I just translated the tests as is and hadn't noticed 
that we were passing non-null offset storage reader and offset backing stores 
instances even for sink connectors in these tests (differing in behavior with 
how the actual `Worker` instantiates `WorkerConnector`s).



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java:
##
@@ -120,43 +105,21 @@ public void testInitializeFailure() {
 workerConnector.doShutdown();
 assertStoppedMetric(workerConnector);
 
-verifyAll();
+verifyCleanInitialize();
+verify(listener).onFailure(CONNECTOR, exception);
+verify(listener).onShutdown(CONNECTOR);

Review Comment:
   The one for shutdown makes sense but I think with startup there are three 
different cases - 
   
   i) No call to `connector.start` or `listener.onStartup` (failure in 
initialization OR started in the paused state)
   ii) Calls to both `connector.start` and `listener.onStartup` (successful 
start)
   iii) Call to only `connector.start` (connector started in the paused state 
and then resumed OR failure on startup)
   
   I think it might be more readable to keep these verify calls in the test 
methods as is rather than trying to force fit them into (in)appropriately named 
methods, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12414: KAFKA-14073 Logging the reason for Snapshot

2022-08-09 Thread GitBox


dengziming commented on code in PR #12414:
URL: https://github.com/apache/kafka/pull/12414#discussion_r941141588


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala:
##
@@ -59,11 +60,19 @@ class BrokerMetadataSnapshotter(
   val writer = writerBuilder.build(
 image.highestOffsetAndEpoch().offset,
 image.highestOffsetAndEpoch().epoch,
-lastContainedLogTime
-  )
+lastContainedLogTime)
   if (writer.nonEmpty) {
 _currentSnapshotOffset = image.highestOffsetAndEpoch().offset
-info(s"Creating a new snapshot at offset ${_currentSnapshotOffset}...")
+
+var stringReasons: Set[String] = Set()
+
+snapshotReasons.foreach(r => stringReasons += r.toString)
+
+if (stringReasons.isEmpty){
+  stringReasons += SnapshotReason.UnknownReason.toString
+}
+
+info(s"Creating a new snapshot at offset ${_currentSnapshotOffset} 
because, ${stringReasons.mkString(" and ")}")

Review Comment:
   We directly use snapshotReasons.mkString(" and ") here and remove 
`stringReasons` and related code.



##
raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java:
##
@@ -107,6 +107,8 @@ public synchronized void handleCommit(BatchReader 
reader) {
 }
 log.debug("Counter incremented from {} to {}", initialCommitted, 
committed);
 
+// A snapshot is being taken here too, not being able to -
+// `import org.apache.kafka.metadata.utils.SnapshotReason`, figure 
out why?

Review Comment:
   The `raft` module only relies on the `clients` module so it can't import a 
class from the `metadata` module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #12494: MINOR: Update site docs for ASF compliance

2022-08-09 Thread GitBox


mimaison commented on PR #12494:
URL: https://github.com/apache/kafka/pull/12494#issuecomment-1209131631

   The path of the images is `/{{version}}/images/`, so do we need to add them 
to this repo too?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode

2022-08-09 Thread GitBox


dajac commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r941077938


##
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##
@@ -1375,8 +1377,11 @@ class PartitionTest extends AbstractPartitionTest {
 assertEquals(alterPartitionListener.failures.get, 1)
   }
 
-  @Test
-  def testIsrNotExpandedIfReplicaIsFenced(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIsrNotExpandedIfReplicaIsFenced(quorum: String): Unit = {

Review Comment:
   nit: Should we update the test name as well?



##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
 metadataCache match {
   // In KRaft mode, only replicas which are not fenced nor in controlled 
shutdown are
-  // allowed to join the ISR. This does not apply to ZK mode.
+  // allowed to join the ISR. In ZK mode, we just ensure the broker is 
alive and not shutting down.

Review Comment:
   This basically means that the leader will retry adding back the 
shutting-down broker to the ISR until the shutting-down broker is removed from 
the metadata cache. It is worth noting that, during this time, other replicas 
cannot be added back to the ISR. The controller rejects any ISR expansion 
containing at least one ineligible replica. This is why we added that 
in-controller-shutdown state in KRaft. It allows the leader to filter them out 
as soon.
   
   This may be acceptable here. Otherwise, we would have to propagate the 
shutting-down brokers via the UpdateMetadataRequest. What do others think?
   



##
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##
@@ -1045,6 +1049,78 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
 assertEquals(expectedAlterPartitionResponse, future.get(10, 
TimeUnit.SECONDS))
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testAlterPartitionVersionSource"))
+  def testShutdownBrokerNotAddedToIsr(alterPartitionVersion: Short): Unit = {
+servers = makeServers(2)
+val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+val otherBroker = servers.find(_.config.brokerId != controllerId).get
+val brokerId = otherBroker.config.brokerId
+val tp = new TopicPartition("t", 0)
+val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
+val fullIsr = List(controllerId, brokerId)
+TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = 
assignment, servers = servers)
+
+// Shut down follower.
+servers(brokerId).shutdown()
+servers(brokerId).awaitShutdown()
+
+val controller = getController().kafkaController
+val leaderIsrAndControllerEpochMap = 
controller.controllerContext.partitionsLeadershipInfo
+val leaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+val topicId = controller.controllerContext.topicIds(tp.topic)
+val controllerEpoch = 
controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+// We expect only the controller (online broker) to be in ISR
+assertEquals(List(controllerId), leaderAndIsr.isr)
+
+val requestTopic = new AlterPartitionRequestData.TopicData()
+  .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+.setPartitionIndex(tp.partition)
+.setLeaderEpoch(leaderAndIsr.leaderEpoch)
+.setPartitionEpoch(leaderAndIsr.partitionEpoch)
+.setNewIsr(fullIsr.map(Int.box).asJava)
+
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)).asJava)
+if (alterPartitionVersion > 1) requestTopic.setTopicId(topicId) else 
requestTopic.setTopicName(tp.topic)
+
+// Try to update ISR to contain the offline broker.
+val alterPartitionRequest = new AlterPartitionRequestData()
+  .setBrokerId(controllerId)
+  .setBrokerEpoch(controllerEpoch)
+  .setTopics(Seq(requestTopic).asJava)
+
+val future = new CompletableFuture[AlterPartitionResponseData]()
+controller.eventManager.put(AlterPartitionReceived(
+  alterPartitionRequest,
+  alterPartitionVersion,
+  future.complete
+))

Review Comment:
   nit: This piece of code is used in multiple places now. I wonder if it is 
worth pulling it in a helper method. What do you think?



##
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##
@@ -1045,6 +1049,78 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
 assertEquals(expectedAlterPartitionResponse, future.get(10, 
TimeUnit.SECONDS))
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testAlterPartitionVersionSource"))
+  def testShutdownBrokerNotAddedToIsr(alterPartitionVersion: Short): Unit = {
+servers = makeServers(2)
+val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+val otherBroker 

[GitHub] [kafka] yashmayya commented on a diff in pull request #12490: KAFKA-14147: Prevent maps from growing monotonically in KafkaConfigBackingStore

2022-08-09 Thread GitBox


yashmayya commented on code in PR #12490:
URL: https://github.com/apache/kafka/pull/12490#discussion_r940928631


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -853,6 +853,9 @@ private void processConnectorConfigRecord(String 
connectorName, SchemaAndValue v
 connectorConfigs.remove(connectorName);
 connectorTaskCounts.remove(connectorName);
 taskConfigs.keySet().removeIf(taskId -> 
taskId.connector().equals(connectorName));
+deferredTaskUpdates.remove(connectorName);
+connectorTaskCountRecords.remove(connectorName);

Review Comment:
   Tbh, it doesn't really seem like it's worth the mess of null handling 
everywhere. I'm gonna back out this change and make this a single line PR 
   
   Btw, unrelated to this PR - but I noticed that the task config generations 
aren't persisted to the config topic, they're simply maintained in the 
in-memory map. So, based on worker restarts, different workers (herders to be 
precise) could have different views on the task config generations for the same 
connector because the config topic is compacted and older task commit records 
could be lost (the task config generation is incremented by 1 each time a task 
commit record is encountered). From what I can tell, task config generations 
are used in two places primarily - 
   
   i) We compare the task config generation from prior to a EOS task startup 
with the task config generation after the task has initialized its 
transactional producer (and fail it if there's a mismatch indicating a new set 
of tasks have been brought up) 
   
   ii) At the end of a zombie fencing, if the task config generation is greater 
than the task config generation at the beginning of the zombie fencing, a `409` 
is returned by the fencing endpoint because a new set of tasks were brought up 
in the meanwhile
   
   I'm struggling to think of any cases where this would cause an issue (i.e. 
different herders having different values for task config generations of the 
same connector) but I was hoping that you could verify my understanding here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org