dajac commented on a change in pull request #11452: URL: https://github.com/apache/kafka/pull/11452#discussion_r754969939
########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ########## @@ -404,10 +405,12 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult public synchronized TransactionalRequestResult sendOffsetsToTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets, final ConsumerGroupMetadata groupMetadata) { ensureTransactional(); + throwIfPendingState("sendOffsetsToTransaction"); maybeFailWithError(); - if (currentState != State.IN_TRANSACTION) - throw new KafkaException("Cannot send offsets to transaction either because the producer is not in an " + - "active transaction"); + + if (currentState != State.IN_TRANSACTION) { + throw new KafkaException("Cannot send offsets to transaction either because in state " + currentState); Review comment: nit: It seems that we could remove `either`. ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ########## @@ -423,34 +426,31 @@ public synchronized TransactionalRequestResult sendOffsetsToTransaction(final Ma return handler.result; } - public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) { - if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition)) - return; + public synchronized void maybeAddPartition(TopicPartition topicPartition) { + maybeFailWithError(); + throwIfPendingState("send"); - log.debug("Begin adding new partition {} to transaction", topicPartition); - topicPartitionBookkeeper.addPartition(topicPartition); - newPartitionsInTransaction.add(topicPartition); + if (isTransactional()) { + if (!hasProducerId()) { + throw new KafkaException("Cannot add partition " + topicPartition + + "to transaction before completing a call to initTransactions"); + } else if (currentState != State.IN_TRANSACTION) { + throw new KafkaException("Cannot add partition " + topicPartition + Review comment: Why are we changing to using `KafkaException` here? I understand that you use `KafkaException` consistently in all the cases now but why? It might be worth checking the javadoc of the methods in `KafkaProducer` if we stick to `KafkaException`. ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ########## @@ -423,34 +426,31 @@ public synchronized TransactionalRequestResult sendOffsetsToTransaction(final Ma return handler.result; } - public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) { - if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition)) - return; + public synchronized void maybeAddPartition(TopicPartition topicPartition) { + maybeFailWithError(); + throwIfPendingState("send"); - log.debug("Begin adding new partition {} to transaction", topicPartition); - topicPartitionBookkeeper.addPartition(topicPartition); - newPartitionsInTransaction.add(topicPartition); + if (isTransactional()) { + if (!hasProducerId()) { + throw new KafkaException("Cannot add partition " + topicPartition + + "to transaction before completing a call to initTransactions"); Review comment: nit: Missing a space before `to`. ########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ########## @@ -1183,20 +1183,42 @@ private TxnOffsetCommitHandler txnOffsetCommitHandler(TransactionalRequestResult return new TxnOffsetCommitHandler(result, builder); } + private void throwPendingTransitionError(String operation) { + throw new KafkaException("Cannot attempt operation `" + operation + "` " Review comment: Intuitively, I would have raised an `IllegalStateException` here. Is there a reason not to? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org