[ 
https://issues.apache.org/jira/browse/KAFKA-19883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-19883:
----------------------------------
    Labels: queues-for-kafka  (was: )

> Support Hierarchical Transactional Acknowledgments for Share Groups
> -------------------------------------------------------------------
>
>                 Key: KAFKA-19883
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19883
>             Project: Kafka
>          Issue Type: New Feature
>          Components: clients, consumer, core
>    Affects Versions: 4.2.0
>            Reporter: Shekhar Prasad Rajak
>            Assignee: Andrew Schofield
>            Priority: Major
>              Labels: queues-for-kafka
>
> Add transactional acknowledgment support to Kafka Share Groups to enable 
> +*exactly-once read semantics*+ in distributed stream processing engines like 
> Apache Spark and Apache Flink.  
> The exactly-once guarantee comes from transactional acknowledgments ensuring 
> no record is permanently acknowledged until the checkpoint commits,
>  
>  *Current Behavior:*
>   Share Groups support only immediate acknowledgment mode where records are 
> permanently acknowledged as soon as consumer.acknowledge() is called. This 
> creates data loss scenarios in distributed streaming frameworks:
>   1. Worker polls and acknowledges records to Kafka
>   2. Records are permanently removed from Kafka
>   3. Checkpoint/batch fails before writing to sink
>   4. Records are lost (acknowledged but never reached sink)
>   This prevents exactly-once semantics in Spark/Flink with Share Groups.
>   *Expected Behavior:*
>   Share Groups should support transactional acknowledgment mode where:
>   1. Records are acknowledged within transactions
>   2. Acknowledgments are pending until transaction commits
>   3. Transaction rollback makes ALL records available for retry
>   4. Enables atomic coordination with framework checkpoints/batches
>   *Proposed Solution:*
>   Implement hierarchical two-phase commit transactions:
> {quote}  BatchTransaction (coordinator/driver level)
>     ├─ TaskTransaction1 (worker/executor level)
>     │    ├─ acknowledge(record1, ACCEPT)
>     │    ├─ acknowledge(record2, ACCEPT)
>     │    └─ prepare() ← Phase 1
>     │
>     ├─ TaskTransaction2 (worker/executor level)
>     │    ├─ acknowledge(record3, ACCEPT)
>     │    └─ prepare() ← Phase 1
>     │
>     └─ commit() ← Phase 2: Atomically commits all task transactions
> {quote}
>  
>   - Batch transactions (coordinator-level)
>   - Task transactions (worker-level)
>   - Records stay in Kafka until transaction commits
>   - Rollback on failure → no data loss
>  
> Example Recovery after failure: 
>  
> ```
>   Example: Recovery After Failure
>   Checkpoint CP1 Processing:
>     Time 10:00 - Consumer 1 polls, gets records [A, B, C] (offsets 1000, 
> 1001, 1002)
>     Time 10:01 - Consumer 2 polls, gets records [D, E, F] (offsets 1003, 
> 1004, 1005)
>     Time 10:02 - Consumer 1 acknowledges A, B, C in transaction 
> "checkpoint-CP1"
>     Time 10:03 - Consumer 2 acknowledges D, E, F in transaction 
> "checkpoint-CP1"
>     Time 10:04 - Consumer 1 crashes before prepare()
>   Broker State:
>     Transaction "checkpoint-CP1": ACTIVE (not prepared)
>     Records in transaction:
>       - A (offset 1000): LOCKED → transaction lock
>       - B (offset 1001): LOCKED → transaction lock
>       - C (offset 1002): LOCKED → transaction lock
>       - D (offset 1003): LOCKED → transaction lock
>       - E (offset 1004): LOCKED → transaction lock
>       - F (offset 1005): LOCKED → transaction lock
>   Recovery (JobManager detects failure):
>     Time 10:05 - JobManager aborts checkpoint CP1
>     Time 10:06 - Rollback transaction "checkpoint-CP1"
>                → A, B, C, D, E, F become AVAILABLE again
>     Time 10:07 - Restore from checkpoint CP1-1
>     Time 10:08 - Consumer 1 (new instance) polls
>                → Kafka returns available records
>                → Might get [A, D, B] (different order than before!)
>     Time 10:09 - Consumer 2 polls
>                → Might get [C, E, F]
> ```
>   *API Changes:*
>   New Consumer API:
>   // Batch-level transaction
>   BatchTransaction beginBatchTransaction(String batchTransactionId);
>   void commitBatchTransaction(BatchTransaction txn, Duration timeout) 
>       throws TimeoutException, TransactionException;
>   void rollbackBatchTransaction(BatchTransaction txn);
>   // Task-level transaction
>   TaskTransaction beginTaskTransaction(BatchTransaction parent, String 
> taskId);
>   void acknowledge(TaskTransaction txn, ConsumerRecord<K,V> record, 
> AcknowledgeType type);
>   void prepareTaskTransaction(TaskTransaction txn);
>   void commitTaskTransaction(TaskTransaction txn);
>   void rollbackTaskTransaction(TaskTransaction txn);
>   *New Configuration:*
>   share.acknowledgement.mode=transactional  # Default: explicit
>   share.batch.transaction.timeout.ms=300000
>   share.task.transaction.timeout.ms=120000
>   share.transaction.commit.timeout.ms=30000
>   share.transaction.auto.rollback=true
>  
> *Target Use Cases:* Apache Spark structured streaming, Apache Flink 
> exactly-once checkpoints, any coordinator-worker streaming framework.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to