[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-21 Thread GitBox


cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r637189573



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
##
@@ -75,87 +61,179 @@ trait ProducerIdGenerator {
   def shutdown() : Unit = {}
 }
 
-class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) 
extends ProducerIdGenerator with Logging {
-
-  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
-
-  private var currentProducerIdBlock: ProducerIdBlock = null
-  private var nextProducerId: Long = -1L
-
-  // grab the first block of producerIds
-  this synchronized {
-getNewProducerIdBlock()
-nextProducerId = currentProducerIdBlock.blockStartId
-  }
-
-  private def getNewProducerIdBlock(): Unit = {
+object ZkProducerIdManager {
+  def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: 
Logging): ProducerIdsBlock = {
+// Get or create the existing PID block from ZK and attempt to update it. 
We retry in a loop here since other
+// brokers may be generating PID blocks during a rolling upgrade
 var zkWriteComplete = false
 while (!zkWriteComplete) {
   // refresh current producerId block from zookeeper again
   val (dataOpt, zkVersion) = 
zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
   // generate the new producerId block
-  currentProducerIdBlock = dataOpt match {
+  val newProducerIdBlock = dataOpt match {
 case Some(data) =>
-  val currProducerIdBlock = 
ProducerIdManager.parseProducerIdBlockData(data)
-  debug(s"Read current producerId block $currProducerIdBlock, Zk path 
version $zkVersion")
+  val currProducerIdBlock = 
ProducerIdBlockZNode.parseProducerIdBlockData(data)
+  logger.debug(s"Read current producerId block $currProducerIdBlock, 
Zk path version $zkVersion")
 
-  if (currProducerIdBlock.blockEndId > Long.MaxValue - 
ProducerIdManager.PidBlockSize) {
+  if (currProducerIdBlock.producerIdEnd > Long.MaxValue - 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
 // we have exhausted all producerIds (wow!), treat it as a fatal 
error
-fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.blockEndId})")
+logger.fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.producerIdEnd})")
 throw new KafkaException("Have exhausted all producerIds.")
   }
 
-  ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, 
currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
+  new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 
1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
 case None =>
-  debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
-  ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+  logger.debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
+  new ProducerIdsBlock(brokerId, 0L, 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
   }
 
-  val newProducerIdBlockData = 
ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
+  val newProducerIdBlockData = 
ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
 
   // try to write the new producerId block into zookeeper
-  val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
-newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
+  val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, 
newProducerIdBlockData, zkVersion, None)
   zkWriteComplete = succeeded
 
-  if (zkWriteComplete)
-info(s"Acquired new producerId block $currentProducerIdBlock by 
writing to Zk with path version $version")
+  if (zkWriteComplete) {
+logger.info(s"Acquired new producerId block $newProducerIdBlock by 
writing to Zk with path version $version")
+return newProducerIdBlock
+  }
 }
+throw new IllegalStateException()
   }
+}
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: 
String, expectedData: Array[Byte]): (Boolean, Int) = {
-try {
-  val expectedPidBlock = 
ProducerIdManager.parseProducerIdBlockData(expectedData)
-  zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
-case (Some(data), zkVersion) =>
-  val currProducerIdBLock = 
ProducerIdManager.parseProducerIdBlockData(data)
-  (currProducerIdBLock == expectedPidBlock, zkVersion)
-case (None, _) => (false, -1)
-  }
-} catch {
-  case e: Exception =>
-warn(s"Error

[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-19 Thread GitBox


cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635610364



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
##
@@ -75,87 +61,179 @@ trait ProducerIdGenerator {
   def shutdown() : Unit = {}
 }
 
-class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) 
extends ProducerIdGenerator with Logging {
-
-  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
-
-  private var currentProducerIdBlock: ProducerIdBlock = null
-  private var nextProducerId: Long = -1L
-
-  // grab the first block of producerIds
-  this synchronized {
-getNewProducerIdBlock()
-nextProducerId = currentProducerIdBlock.blockStartId
-  }
-
-  private def getNewProducerIdBlock(): Unit = {
+object ZkProducerIdManager {
+  def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: 
Logging): ProducerIdsBlock = {
+// Get or create the existing PID block from ZK and attempt to update it. 
We retry in a loop here since other
+// brokers may be generating PID blocks during a rolling upgrade
 var zkWriteComplete = false
 while (!zkWriteComplete) {
   // refresh current producerId block from zookeeper again
   val (dataOpt, zkVersion) = 
zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
   // generate the new producerId block
-  currentProducerIdBlock = dataOpt match {
+  val newProducerIdBlock = dataOpt match {
 case Some(data) =>
-  val currProducerIdBlock = 
ProducerIdManager.parseProducerIdBlockData(data)
-  debug(s"Read current producerId block $currProducerIdBlock, Zk path 
version $zkVersion")
+  val currProducerIdBlock = 
ProducerIdBlockZNode.parseProducerIdBlockData(data)
+  logger.debug(s"Read current producerId block $currProducerIdBlock, 
Zk path version $zkVersion")
 
-  if (currProducerIdBlock.blockEndId > Long.MaxValue - 
ProducerIdManager.PidBlockSize) {
+  if (currProducerIdBlock.producerIdEnd > Long.MaxValue - 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
 // we have exhausted all producerIds (wow!), treat it as a fatal 
error
-fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.blockEndId})")
+logger.fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.producerIdEnd})")
 throw new KafkaException("Have exhausted all producerIds.")
   }
 
-  ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, 
currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
+  new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 
1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
 case None =>
-  debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
-  ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+  logger.debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
+  new ProducerIdsBlock(brokerId, 0L, 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
   }
 
-  val newProducerIdBlockData = 
ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
+  val newProducerIdBlockData = 
ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
 
   // try to write the new producerId block into zookeeper
-  val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
-newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
+  val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, 
newProducerIdBlockData, zkVersion, None)
   zkWriteComplete = succeeded
 
-  if (zkWriteComplete)
-info(s"Acquired new producerId block $currentProducerIdBlock by 
writing to Zk with path version $version")
+  if (zkWriteComplete) {
+logger.info(s"Acquired new producerId block $newProducerIdBlock by 
writing to Zk with path version $version")
+return newProducerIdBlock
+  }
 }
+throw new IllegalStateException()
   }
+}
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: 
String, expectedData: Array[Byte]): (Boolean, Int) = {
-try {
-  val expectedPidBlock = 
ProducerIdManager.parseProducerIdBlockData(expectedData)
-  zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
-case (Some(data), zkVersion) =>
-  val currProducerIdBLock = 
ProducerIdManager.parseProducerIdBlockData(data)
-  (currProducerIdBLock == expectedPidBlock, zkVersion)
-case (None, _) => (false, -1)
-  }
-} catch {
-  case e: Exception =>
-warn(s"Error

[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-19 Thread GitBox


cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635585021



##
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##
@@ -330,10 +332,21 @@ class KafkaServer(
 groupCoordinator = GroupCoordinator(config, replicaManager, 
Time.SYSTEM, metrics)
 groupCoordinator.startup(() => 
zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions))
 
+/* create producer ids manager */
+val producerIdManager = if 
(config.interBrokerProtocolVersion.isAllocateProducerIdsSupported) {
+  ProducerIdGenerator(

Review comment:
   It seems a bit weird that both function calls both have the same name. 
Can't we just create the subclass we want directly here, for clarity? One is 
ZkWhatever, the other is RpcWhatever, etc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-19 Thread GitBox


cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635585021



##
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##
@@ -330,10 +332,21 @@ class KafkaServer(
 groupCoordinator = GroupCoordinator(config, replicaManager, 
Time.SYSTEM, metrics)
 groupCoordinator.startup(() => 
zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions))
 
+/* create producer ids manager */
+val producerIdManager = if 
(config.interBrokerProtocolVersion.isAllocateProducerIdsSupported) {
+  ProducerIdGenerator(

Review comment:
   It seems a bit weird that both function calls both have the same name. 
Can't we just create the subclass we want directly here, for clarity? One is 
ZkWhatever, the other is ForwardingWhatever, etc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-19 Thread GitBox


cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635582909



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
##
@@ -75,87 +61,179 @@ trait ProducerIdGenerator {
   def shutdown() : Unit = {}
 }
 
-class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) 
extends ProducerIdGenerator with Logging {
-
-  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
-
-  private var currentProducerIdBlock: ProducerIdBlock = null
-  private var nextProducerId: Long = -1L
-
-  // grab the first block of producerIds
-  this synchronized {
-getNewProducerIdBlock()
-nextProducerId = currentProducerIdBlock.blockStartId
-  }
-
-  private def getNewProducerIdBlock(): Unit = {
+object ZkProducerIdManager {
+  def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: 
Logging): ProducerIdsBlock = {
+// Get or create the existing PID block from ZK and attempt to update it. 
We retry in a loop here since other
+// brokers may be generating PID blocks during a rolling upgrade
 var zkWriteComplete = false
 while (!zkWriteComplete) {
   // refresh current producerId block from zookeeper again
   val (dataOpt, zkVersion) = 
zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
   // generate the new producerId block
-  currentProducerIdBlock = dataOpt match {
+  val newProducerIdBlock = dataOpt match {
 case Some(data) =>
-  val currProducerIdBlock = 
ProducerIdManager.parseProducerIdBlockData(data)
-  debug(s"Read current producerId block $currProducerIdBlock, Zk path 
version $zkVersion")
+  val currProducerIdBlock = 
ProducerIdBlockZNode.parseProducerIdBlockData(data)
+  logger.debug(s"Read current producerId block $currProducerIdBlock, 
Zk path version $zkVersion")
 
-  if (currProducerIdBlock.blockEndId > Long.MaxValue - 
ProducerIdManager.PidBlockSize) {
+  if (currProducerIdBlock.producerIdEnd > Long.MaxValue - 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
 // we have exhausted all producerIds (wow!), treat it as a fatal 
error
-fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.blockEndId})")
+logger.fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.producerIdEnd})")
 throw new KafkaException("Have exhausted all producerIds.")
   }
 
-  ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, 
currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
+  new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 
1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
 case None =>
-  debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
-  ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+  logger.debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
+  new ProducerIdsBlock(brokerId, 0L, 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
   }
 
-  val newProducerIdBlockData = 
ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
+  val newProducerIdBlockData = 
ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
 
   // try to write the new producerId block into zookeeper
-  val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
-newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
+  val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, 
newProducerIdBlockData, zkVersion, None)
   zkWriteComplete = succeeded
 
-  if (zkWriteComplete)
-info(s"Acquired new producerId block $currentProducerIdBlock by 
writing to Zk with path version $version")
+  if (zkWriteComplete) {
+logger.info(s"Acquired new producerId block $newProducerIdBlock by 
writing to Zk with path version $version")
+return newProducerIdBlock
+  }
 }
+throw new IllegalStateException()
   }
+}
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: 
String, expectedData: Array[Byte]): (Boolean, Int) = {
-try {
-  val expectedPidBlock = 
ProducerIdManager.parseProducerIdBlockData(expectedData)
-  zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
-case (Some(data), zkVersion) =>
-  val currProducerIdBLock = 
ProducerIdManager.parseProducerIdBlockData(data)
-  (currProducerIdBLock == expectedPidBlock, zkVersion)
-case (None, _) => (false, -1)
-  }
-} catch {
-  case e: Exception =>
-warn(s"Error

[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-19 Thread GitBox


cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635578464



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
##
@@ -75,87 +61,179 @@ trait ProducerIdGenerator {
   def shutdown() : Unit = {}
 }
 
-class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) 
extends ProducerIdGenerator with Logging {
-
-  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
-
-  private var currentProducerIdBlock: ProducerIdBlock = null
-  private var nextProducerId: Long = -1L
-
-  // grab the first block of producerIds
-  this synchronized {
-getNewProducerIdBlock()
-nextProducerId = currentProducerIdBlock.blockStartId
-  }
-
-  private def getNewProducerIdBlock(): Unit = {
+object ZkProducerIdManager {
+  def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: 
Logging): ProducerIdsBlock = {
+// Get or create the existing PID block from ZK and attempt to update it. 
We retry in a loop here since other
+// brokers may be generating PID blocks during a rolling upgrade
 var zkWriteComplete = false
 while (!zkWriteComplete) {
   // refresh current producerId block from zookeeper again
   val (dataOpt, zkVersion) = 
zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
   // generate the new producerId block
-  currentProducerIdBlock = dataOpt match {
+  val newProducerIdBlock = dataOpt match {
 case Some(data) =>
-  val currProducerIdBlock = 
ProducerIdManager.parseProducerIdBlockData(data)
-  debug(s"Read current producerId block $currProducerIdBlock, Zk path 
version $zkVersion")
+  val currProducerIdBlock = 
ProducerIdBlockZNode.parseProducerIdBlockData(data)
+  logger.debug(s"Read current producerId block $currProducerIdBlock, 
Zk path version $zkVersion")
 
-  if (currProducerIdBlock.blockEndId > Long.MaxValue - 
ProducerIdManager.PidBlockSize) {
+  if (currProducerIdBlock.producerIdEnd > Long.MaxValue - 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
 // we have exhausted all producerIds (wow!), treat it as a fatal 
error
-fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.blockEndId})")
+logger.fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.producerIdEnd})")
 throw new KafkaException("Have exhausted all producerIds.")
   }
 
-  ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, 
currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
+  new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 
1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
 case None =>
-  debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
-  ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+  logger.debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
+  new ProducerIdsBlock(brokerId, 0L, 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
   }
 
-  val newProducerIdBlockData = 
ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
+  val newProducerIdBlockData = 
ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
 
   // try to write the new producerId block into zookeeper
-  val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
-newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
+  val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, 
newProducerIdBlockData, zkVersion, None)
   zkWriteComplete = succeeded
 
-  if (zkWriteComplete)
-info(s"Acquired new producerId block $currentProducerIdBlock by 
writing to Zk with path version $version")
+  if (zkWriteComplete) {
+logger.info(s"Acquired new producerId block $newProducerIdBlock by 
writing to Zk with path version $version")
+return newProducerIdBlock
+  }
 }
+throw new IllegalStateException()
   }
+}
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: 
String, expectedData: Array[Byte]): (Boolean, Int) = {
-try {
-  val expectedPidBlock = 
ProducerIdManager.parseProducerIdBlockData(expectedData)
-  zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
-case (Some(data), zkVersion) =>
-  val currProducerIdBLock = 
ProducerIdManager.parseProducerIdBlockData(data)
-  (currProducerIdBLock == expectedPidBlock, zkVersion)
-case (None, _) => (false, -1)
-  }
-} catch {
-  case e: Exception =>
-warn(s"Error

[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-19 Thread GitBox


cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635572907



##
File path: core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
##
@@ -248,5 +261,17 @@ public void stop() {
 clusterReference.get().tearDown();
 }
 }
+
+@Override
+public void rollingBrokerRestart() {
+if (started.get()) {
+for (int i = 0; i < clusterReference.get().brokerCount(); i++) 
{

Review comment:
   It seems like there should be a log message here before we kill the 
broker, to make it easier to see what is going on?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-19 Thread GitBox


cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635572503



##
File path: core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
##
@@ -248,5 +261,17 @@ public void stop() {
 clusterReference.get().tearDown();
 }
 }
+
+@Override
+public void rollingBrokerRestart() {
+if (started.get()) {
+for (int i = 0; i < clusterReference.get().brokerCount(); i++) 
{
+clusterReference.get().killBroker(i);
+}
+clusterReference.get().restartDeadBrokers(true);
+} else {

Review comment:
   It would be a bit nicer to move this section to the front, to get rid of 
the nesting here.

##
File path: core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
##
@@ -248,5 +261,17 @@ public void stop() {
 clusterReference.get().tearDown();
 }
 }
+
+@Override
+public void rollingBrokerRestart() {
+if (started.get()) {
+for (int i = 0; i < clusterReference.get().brokerCount(); i++) 
{
+clusterReference.get().killBroker(i);
+}
+clusterReference.get().restartDeadBrokers(true);
+} else {

Review comment:
   It would be a bit nicer to move this section to the beginning, to get 
rid of the nesting here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-19 Thread GitBox


cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635532912



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -2376,6 +2375,82 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def allocateProducerIds(allocateProducerIdsRequest: 
AllocateProducerIdsRequestData,
+  callback: AllocateProducerIdsResponseData => Unit): 
Unit = {
+
+def eventManagerCallback(results: Either[Errors, ProducerIdsBlock]): Unit 
= {
+  results match {
+case Left(error) => callback.apply(new 
AllocateProducerIdsResponseData().setErrorCode(error.code))
+case Right(pidBlock) => callback.apply(
+  new AllocateProducerIdsResponseData()
+.setProducerIdStart(pidBlock.producerIdStart())
+.setProducerIdLen(pidBlock.producerIdLen()))
+  }
+}
+eventManager.put(AllocateProducerIds(allocateProducerIdsRequest.brokerId,
+  allocateProducerIdsRequest.brokerEpoch, eventManagerCallback))
+  }
+
+  def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: 
Either[Errors, ProducerIdsBlock] => Unit): Unit = {
+// Handle a few short-circuits
+if (!isActive) {
+  callback.apply(Left(Errors.NOT_CONTROLLER))
+  return
+}
+
+val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+if (brokerEpochOpt.isEmpty) {
+  warn(s"Ignoring AllocateProducerIds due to unknown broker $brokerId")
+  callback.apply(Left(Errors.STALE_BROKER_EPOCH))

Review comment:
   I think that because the quorum controller and the zk controller are two 
implementations of the same API protocol, they should share the same error 
codes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-19 Thread GitBox


cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635532912



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -2376,6 +2375,82 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def allocateProducerIds(allocateProducerIdsRequest: 
AllocateProducerIdsRequestData,
+  callback: AllocateProducerIdsResponseData => Unit): 
Unit = {
+
+def eventManagerCallback(results: Either[Errors, ProducerIdsBlock]): Unit 
= {
+  results match {
+case Left(error) => callback.apply(new 
AllocateProducerIdsResponseData().setErrorCode(error.code))
+case Right(pidBlock) => callback.apply(
+  new AllocateProducerIdsResponseData()
+.setProducerIdStart(pidBlock.producerIdStart())
+.setProducerIdLen(pidBlock.producerIdLen()))
+  }
+}
+eventManager.put(AllocateProducerIds(allocateProducerIdsRequest.brokerId,
+  allocateProducerIdsRequest.brokerEpoch, eventManagerCallback))
+  }
+
+  def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: 
Either[Errors, ProducerIdsBlock] => Unit): Unit = {
+// Handle a few short-circuits
+if (!isActive) {
+  callback.apply(Left(Errors.NOT_CONTROLLER))
+  return
+}
+
+val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+if (brokerEpochOpt.isEmpty) {
+  warn(s"Ignoring AllocateProducerIds due to unknown broker $brokerId")
+  callback.apply(Left(Errors.STALE_BROKER_EPOCH))

Review comment:
   The quorum controller and the zk controller are two implementations of 
the same API protocol. It is always fine (in fact, it's necessary) to reuse 
error codes for both where appropriate.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-11 Thread GitBox


cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r630587756



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -2376,6 +2375,82 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def allocateProducerIds(allocateProducerIdsRequest: 
AllocateProducerIdsRequestData,
+  callback: AllocateProducerIdsResponseData => Unit): 
Unit = {
+
+def eventManagerCallback(results: Either[Errors, ProducerIdsBlock]): Unit 
= {
+  results match {
+case Left(error) => callback.apply(new 
AllocateProducerIdsResponseData().setErrorCode(error.code))
+case Right(pidBlock) => callback.apply(
+  new AllocateProducerIdsResponseData()
+.setProducerIdStart(pidBlock.producerIdStart())
+.setProducerIdLen(pidBlock.producerIdLen()))
+  }
+}
+eventManager.put(AllocateProducerIds(allocateProducerIdsRequest.brokerId,
+  allocateProducerIdsRequest.brokerEpoch, eventManagerCallback))
+  }
+
+  def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: 
Either[Errors, ProducerIdsBlock] => Unit): Unit = {
+// Handle a few short-circuits
+if (!isActive) {
+  callback.apply(Left(Errors.NOT_CONTROLLER))
+  return
+}
+
+val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+if (brokerEpochOpt.isEmpty) {
+  warn(s"Ignoring AllocateProducerIds due to unknown broker $brokerId")
+  callback.apply(Left(Errors.STALE_BROKER_EPOCH))

Review comment:
   It seems like this should be BROKER_ID_NOT_REGISTERED




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-11 Thread GitBox


cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r630569991



##
File path: 
clients/src/main/resources/common/message/AllocateProducerIdsResponse.json
##
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 67,
+  "type": "response",
+  "name": "AllocateProducerIdsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The top level response error code" },
+{ "name": "ProducerIdStart", "type": "int64", "versions": "0+",

Review comment:
   I think this should have entity type `producerId`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org