artemlivshits commented on code in PR #16840:
URL: https://github.com/apache/kafka/pull/16840#discussion_r1724057107
##########
clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java:
##########
@@ -115,6 +134,18 @@ public NodeApiVersions(Collection<ApiVersion>
nodeApiVersions, Collection<Suppor
}
this.supportedFeatures =
Collections.unmodifiableMap(supportedFeaturesBuilder);
this.zkMigrationEnabled = zkMigrationEnabled;
+
+ this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
+ if (finalizedFeaturesEpoch == -1) {
+ this.finalizedFeatures = Collections.emptyMap();
+ return;
+ }
+
+ Map<String, Short> finalizedFeaturesBuilder = new HashMap<>();
Review Comment:
Do we need a separate map? I think we can just put directly to
`finalizedFeatures`.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -368,15 +370,21 @@ public synchronized TransactionalRequestResult
sendOffsetsToTransaction(final Ma
"(currentState= " + currentState + ")");
}
- log.debug("Begin adding offsets {} for consumer group {} to
transaction", offsets, groupMetadata);
- AddOffsetsToTxnRequest.Builder builder = new
AddOffsetsToTxnRequest.Builder(
- new AddOffsetsToTxnRequestData()
- .setTransactionalId(transactionalId)
- .setProducerId(producerIdAndEpoch.producerId)
- .setProducerEpoch(producerIdAndEpoch.epoch)
- .setGroupId(groupMetadata.groupId())
- );
- AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder,
offsets, groupMetadata);
+ TxnRequestHandler handler;
+ if (coordinatorSupportsTransactionV2) {
+ log.debug("Begin adding offsets {} for consumer group {} to
transaction with transaction protocol V2", offsets, groupMetadata);
+ handler = txnOffsetCommitHandler(null, offsets, groupMetadata,
ApiKeys.TXN_OFFSET_COMMIT.latestVersion());
+ } else {
+ log.debug("Begin adding offsets {} for consumer group {} to
transaction", offsets, groupMetadata);
+ AddOffsetsToTxnRequest.Builder builder = new
AddOffsetsToTxnRequest.Builder(
+ new AddOffsetsToTxnRequestData()
+ .setTransactionalId(transactionalId)
+ .setProducerId(producerIdAndEpoch.producerId)
+ .setProducerEpoch(producerIdAndEpoch.epoch)
+ .setGroupId(groupMetadata.groupId())
+ );
+ handler = new AddOffsetsToTxnHandler(builder, offsets,
groupMetadata);
Review Comment:
I presume once the txn v2 protocol is released, the latest version will be
the default. but we need to use a previous version because v2 is not enabled.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -395,6 +403,8 @@ public synchronized void maybeAddPartition(TopicPartition
topicPartition) {
" to transaction while in state " + currentState);
} else if (isPartitionAdded(topicPartition) ||
isPartitionPendingAdd(topicPartition)) {
return;
+ } else if (coordinatorSupportsTransactionV2) {
+ txnPartitionMap.getOrCreate(topicPartition);
Review Comment:
Why not just directly add to `partitionsInTransaction` set here?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -980,6 +1002,28 @@ void handleCoordinatorReady() {
null;
this.coordinatorSupportsBumpingEpoch = initProducerIdVersion != null &&
initProducerIdVersion.maxVersion() >= 3;
+
+ if (nodeApiVersions == null) return;
+
+ if (nodeApiVersions.finalizedFeatures() != null) {
+ /*
+ To enable the transaction V2, it requires:
+ 1. transaction.version finalized version >= 2
Review Comment:
We should check every node API versions (not just coordinator's) and keep
track of the latest transaction version (where "latest" is defined as the
latest epoch). So that logic would be something like this:
1. When we get API versions from any node, check if the transaction manager
has the epoch that is less than the new epoch.
2. If the epoch stored in the transaction manager is less than the new
epoch, store the new value and the new epoch in the transaction manager.
3. Have a method `boolean coordinatorSupportsTransactionV2() { return
this.transactionVersion >= 2; }`
This way we learn the cluster-wide consensus on the transaction version, vs.
individual brokers that may have different value so our view doesn't jump back
and forth (if coordinator moves, for example).
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -904,16 +906,21 @@ private void sendProduceRequest(long now, int
destination, short acks, int timeo
}
String transactionalId = null;
+ short maxProduceRequestVersion = ApiKeys.PRODUCE.latestVersion();
if (transactionManager != null &&
transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
+ if (!transactionManager.isTransactionV2Enabled()) {
+ maxProduceRequestVersion =
ProduceRequest.LAST_BEFORE_TRANSACTION_V2_VERSION;
Review Comment:
What if `ApiKeys.PRODUCE.latestVersion()` is less than
`ProduceRequest.LAST_BEFORE_TRANSACTION_V2_VERSION`?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -980,6 +1002,28 @@ void handleCoordinatorReady() {
null;
this.coordinatorSupportsBumpingEpoch = initProducerIdVersion != null &&
initProducerIdVersion.maxVersion() >= 3;
+
+ if (nodeApiVersions == null) return;
+
+ if (nodeApiVersions.finalizedFeatures() != null) {
+ /*
+ To enable the transaction V2, it requires:
+ 1. transaction.version finalized version >= 2
+ 2. The ProduceRequest max version >
ProducerRequest.LAST_BEFORE_TRANSACTION_V2_VERSION
Review Comment:
Why would we check the produce request version on the coordinator? We could
be producing to a different broker that could have a different produce version
than the coordinator. Ditto for OffsetCommitRequest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]