[
https://issues.apache.org/jira/browse/KAFKA-19883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andrew Schofield reassigned KAFKA-19883:
----------------------------------------
Assignee: Andrew Schofield
> Support Hierarchical Transactional Acknowledgments for Share Groups
> -------------------------------------------------------------------
>
> Key: KAFKA-19883
> URL: https://issues.apache.org/jira/browse/KAFKA-19883
> Project: Kafka
> Issue Type: New Feature
> Components: clients, consumer, core
> Affects Versions: 4.2.0
> Reporter: Shekhar Prasad Rajak
> Assignee: Andrew Schofield
> Priority: Major
>
> Add transactional acknowledgment support to Kafka Share Groups to enable
> +*exactly-once read semantics*+ in distributed stream processing engines like
> Apache Spark and Apache Flink.
>
> *Current Behavior:*
> Share Groups support only immediate acknowledgment mode where records are
> permanently acknowledged as soon as consumer.acknowledge() is called. This
> creates data loss scenarios in distributed streaming frameworks:
> 1. Worker polls and acknowledges records to Kafka
> 2. Records are permanently removed from Kafka
> 3. Checkpoint/batch fails before writing to sink
> 4. Records are lost (acknowledged but never reached sink)
> This prevents exactly-once semantics in Spark/Flink with Share Groups.
> *Expected Behavior:*
> Share Groups should support transactional acknowledgment mode where:
> 1. Records are acknowledged within transactions
> 2. Acknowledgments are pending until transaction commits
> 3. Transaction rollback makes ALL records available for retry
> 4. Enables atomic coordination with framework checkpoints/batches
> *Proposed Solution:*
> Implement hierarchical two-phase commit transactions:
> {quote} BatchTransaction (coordinator/driver level)
> ├─ TaskTransaction1 (worker/executor level)
> │ ├─ acknowledge(record1, ACCEPT)
> │ ├─ acknowledge(record2, ACCEPT)
> │ └─ prepare() ← Phase 1
> │
> ├─ TaskTransaction2 (worker/executor level)
> │ ├─ acknowledge(record3, ACCEPT)
> │ └─ prepare() ← Phase 1
> │
> └─ commit() ← Phase 2: Atomically commits all task transactions
> {quote}
>
> - Batch transactions (coordinator-level)
> - Task transactions (worker-level)
> - Records stay in Kafka until transaction commits
> - Rollback on failure → no data loss
>
> *API Changes:*
> New Consumer API:
> // Batch-level transaction
> BatchTransaction beginBatchTransaction(String batchTransactionId);
> void commitBatchTransaction(BatchTransaction txn, Duration timeout)
> throws TimeoutException, TransactionException;
> void rollbackBatchTransaction(BatchTransaction txn);
> // Task-level transaction
> TaskTransaction beginTaskTransaction(BatchTransaction parent, String
> taskId);
> void acknowledge(TaskTransaction txn, ConsumerRecord<K,V> record,
> AcknowledgeType type);
> void prepareTaskTransaction(TaskTransaction txn);
> void commitTaskTransaction(TaskTransaction txn);
> void rollbackTaskTransaction(TaskTransaction txn);
> *New Configuration:*
> share.acknowledgement.mode=transactional # Default: explicit
> share.batch.transaction.timeout.ms=300000
> share.task.transaction.timeout.ms=120000
> share.transaction.commit.timeout.ms=30000
> share.transaction.auto.rollback=true
>
> *Target Use Cases:* Apache Spark structured streaming, Apache Flink
> exactly-once checkpoints, any coordinator-worker streaming framework.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)