dajac commented on a change in pull request #11452:
URL: https://github.com/apache/kafka/pull/11452#discussion_r754969939



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -404,10 +405,12 @@ private TransactionalRequestResult 
beginCompletingTransaction(TransactionResult
     public synchronized TransactionalRequestResult 
sendOffsetsToTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
                                                                             
final ConsumerGroupMetadata groupMetadata) {
         ensureTransactional();
+        throwIfPendingState("sendOffsetsToTransaction");
         maybeFailWithError();
-        if (currentState != State.IN_TRANSACTION)
-            throw new KafkaException("Cannot send offsets to transaction 
either because the producer is not in an " +
-                    "active transaction");
+
+        if (currentState != State.IN_TRANSACTION) {
+            throw new KafkaException("Cannot send offsets to transaction 
either because in state " + currentState);

Review comment:
       nit: It seems that we could remove `either`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -423,34 +426,31 @@ public synchronized TransactionalRequestResult 
sendOffsetsToTransaction(final Ma
         return handler.result;
     }
 
-    public synchronized void maybeAddPartitionToTransaction(TopicPartition 
topicPartition) {
-        if (isPartitionAdded(topicPartition) || 
isPartitionPendingAdd(topicPartition))
-            return;
+    public synchronized void maybeAddPartition(TopicPartition topicPartition) {
+        maybeFailWithError();
+        throwIfPendingState("send");
 
-        log.debug("Begin adding new partition {} to transaction", 
topicPartition);
-        topicPartitionBookkeeper.addPartition(topicPartition);
-        newPartitionsInTransaction.add(topicPartition);
+        if (isTransactional()) {
+            if (!hasProducerId()) {
+                throw new KafkaException("Cannot add partition " + 
topicPartition +
+                    "to transaction before completing a call to 
initTransactions");
+            } else if (currentState != State.IN_TRANSACTION) {
+                throw new KafkaException("Cannot add partition " + 
topicPartition +

Review comment:
       Why are we changing to using `KafkaException` here? I understand that 
you use `KafkaException` consistently in all the cases now but why?
   
   It might be worth checking the javadoc of the methods in `KafkaProducer` if 
we stick to `KafkaException`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -423,34 +426,31 @@ public synchronized TransactionalRequestResult 
sendOffsetsToTransaction(final Ma
         return handler.result;
     }
 
-    public synchronized void maybeAddPartitionToTransaction(TopicPartition 
topicPartition) {
-        if (isPartitionAdded(topicPartition) || 
isPartitionPendingAdd(topicPartition))
-            return;
+    public synchronized void maybeAddPartition(TopicPartition topicPartition) {
+        maybeFailWithError();
+        throwIfPendingState("send");
 
-        log.debug("Begin adding new partition {} to transaction", 
topicPartition);
-        topicPartitionBookkeeper.addPartition(topicPartition);
-        newPartitionsInTransaction.add(topicPartition);
+        if (isTransactional()) {
+            if (!hasProducerId()) {
+                throw new KafkaException("Cannot add partition " + 
topicPartition +
+                    "to transaction before completing a call to 
initTransactions");

Review comment:
       nit: Missing a space before `to`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1183,20 +1183,42 @@ private TxnOffsetCommitHandler 
txnOffsetCommitHandler(TransactionalRequestResult
         return new TxnOffsetCommitHandler(result, builder);
     }
 
+    private void throwPendingTransitionError(String operation) {
+        throw new KafkaException("Cannot attempt operation `" + operation + "` 
"

Review comment:
       Intuitively, I would have raised an `IllegalStateException` here. Is 
there a reason not to?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to