[ 
https://issues.apache.org/jira/browse/KAFKA-19883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield reassigned KAFKA-19883:
----------------------------------------

    Assignee: Andrew Schofield

> Support Hierarchical Transactional Acknowledgments for Share Groups
> -------------------------------------------------------------------
>
>                 Key: KAFKA-19883
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19883
>             Project: Kafka
>          Issue Type: New Feature
>          Components: clients, consumer, core
>    Affects Versions: 4.2.0
>            Reporter: Shekhar Prasad Rajak
>            Assignee: Andrew Schofield
>            Priority: Major
>
> Add transactional acknowledgment support to Kafka Share Groups to enable 
> +*exactly-once read semantics*+ in distributed stream processing engines like 
> Apache Spark and Apache Flink.
>  
>  *Current Behavior:*
>   Share Groups support only immediate acknowledgment mode where records are 
> permanently acknowledged as soon as consumer.acknowledge() is called. This 
> creates data loss scenarios in distributed streaming frameworks:
>   1. Worker polls and acknowledges records to Kafka
>   2. Records are permanently removed from Kafka
>   3. Checkpoint/batch fails before writing to sink
>   4. Records are lost (acknowledged but never reached sink)
>   This prevents exactly-once semantics in Spark/Flink with Share Groups.
>   *Expected Behavior:*
>   Share Groups should support transactional acknowledgment mode where:
>   1. Records are acknowledged within transactions
>   2. Acknowledgments are pending until transaction commits
>   3. Transaction rollback makes ALL records available for retry
>   4. Enables atomic coordination with framework checkpoints/batches
>   *Proposed Solution:*
>   Implement hierarchical two-phase commit transactions:
> {quote}  BatchTransaction (coordinator/driver level)
>     ├─ TaskTransaction1 (worker/executor level)
>     │    ├─ acknowledge(record1, ACCEPT)
>     │    ├─ acknowledge(record2, ACCEPT)
>     │    └─ prepare() ← Phase 1
>     │
>     ├─ TaskTransaction2 (worker/executor level)
>     │    ├─ acknowledge(record3, ACCEPT)
>     │    └─ prepare() ← Phase 1
>     │
>     └─ commit() ← Phase 2: Atomically commits all task transactions
> {quote}
>  
>   - Batch transactions (coordinator-level)
>   - Task transactions (worker-level)
>   - Records stay in Kafka until transaction commits
>   - Rollback on failure → no data loss
>  
>   *API Changes:*
>   New Consumer API:
>   // Batch-level transaction
>   BatchTransaction beginBatchTransaction(String batchTransactionId);
>   void commitBatchTransaction(BatchTransaction txn, Duration timeout) 
>       throws TimeoutException, TransactionException;
>   void rollbackBatchTransaction(BatchTransaction txn);
>   // Task-level transaction
>   TaskTransaction beginTaskTransaction(BatchTransaction parent, String 
> taskId);
>   void acknowledge(TaskTransaction txn, ConsumerRecord<K,V> record, 
> AcknowledgeType type);
>   void prepareTaskTransaction(TaskTransaction txn);
>   void commitTaskTransaction(TaskTransaction txn);
>   void rollbackTaskTransaction(TaskTransaction txn);
>   *New Configuration:*
>   share.acknowledgement.mode=transactional  # Default: explicit
>   share.batch.transaction.timeout.ms=300000
>   share.task.transaction.timeout.ms=120000
>   share.transaction.commit.timeout.ms=30000
>   share.transaction.auto.rollback=true
>  
> *Target Use Cases:* Apache Spark structured streaming, Apache Flink 
> exactly-once checkpoints, any coordinator-worker streaming framework.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to