[
https://issues.apache.org/jira/browse/KAFKA-19883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apoorv Mittal updated KAFKA-19883:
----------------------------------
Labels: queues-for-kafka (was: )
> Support Hierarchical Transactional Acknowledgments for Share Groups
> -------------------------------------------------------------------
>
> Key: KAFKA-19883
> URL: https://issues.apache.org/jira/browse/KAFKA-19883
> Project: Kafka
> Issue Type: New Feature
> Components: clients, consumer, core
> Affects Versions: 4.2.0
> Reporter: Shekhar Prasad Rajak
> Assignee: Andrew Schofield
> Priority: Major
> Labels: queues-for-kafka
>
> Add transactional acknowledgment support to Kafka Share Groups to enable
> +*exactly-once read semantics*+ in distributed stream processing engines like
> Apache Spark and Apache Flink.
> The exactly-once guarantee comes from transactional acknowledgments ensuring
> no record is permanently acknowledged until the checkpoint commits,
>
> *Current Behavior:*
> Share Groups support only immediate acknowledgment mode where records are
> permanently acknowledged as soon as consumer.acknowledge() is called. This
> creates data loss scenarios in distributed streaming frameworks:
> 1. Worker polls and acknowledges records to Kafka
> 2. Records are permanently removed from Kafka
> 3. Checkpoint/batch fails before writing to sink
> 4. Records are lost (acknowledged but never reached sink)
> This prevents exactly-once semantics in Spark/Flink with Share Groups.
> *Expected Behavior:*
> Share Groups should support transactional acknowledgment mode where:
> 1. Records are acknowledged within transactions
> 2. Acknowledgments are pending until transaction commits
> 3. Transaction rollback makes ALL records available for retry
> 4. Enables atomic coordination with framework checkpoints/batches
> *Proposed Solution:*
> Implement hierarchical two-phase commit transactions:
> {quote} BatchTransaction (coordinator/driver level)
> ├─ TaskTransaction1 (worker/executor level)
> │ ├─ acknowledge(record1, ACCEPT)
> │ ├─ acknowledge(record2, ACCEPT)
> │ └─ prepare() ← Phase 1
> │
> ├─ TaskTransaction2 (worker/executor level)
> │ ├─ acknowledge(record3, ACCEPT)
> │ └─ prepare() ← Phase 1
> │
> └─ commit() ← Phase 2: Atomically commits all task transactions
> {quote}
>
> - Batch transactions (coordinator-level)
> - Task transactions (worker-level)
> - Records stay in Kafka until transaction commits
> - Rollback on failure → no data loss
>
> Example Recovery after failure:
>
> ```
> Example: Recovery After Failure
> Checkpoint CP1 Processing:
> Time 10:00 - Consumer 1 polls, gets records [A, B, C] (offsets 1000,
> 1001, 1002)
> Time 10:01 - Consumer 2 polls, gets records [D, E, F] (offsets 1003,
> 1004, 1005)
> Time 10:02 - Consumer 1 acknowledges A, B, C in transaction
> "checkpoint-CP1"
> Time 10:03 - Consumer 2 acknowledges D, E, F in transaction
> "checkpoint-CP1"
> Time 10:04 - Consumer 1 crashes before prepare()
> Broker State:
> Transaction "checkpoint-CP1": ACTIVE (not prepared)
> Records in transaction:
> - A (offset 1000): LOCKED → transaction lock
> - B (offset 1001): LOCKED → transaction lock
> - C (offset 1002): LOCKED → transaction lock
> - D (offset 1003): LOCKED → transaction lock
> - E (offset 1004): LOCKED → transaction lock
> - F (offset 1005): LOCKED → transaction lock
> Recovery (JobManager detects failure):
> Time 10:05 - JobManager aborts checkpoint CP1
> Time 10:06 - Rollback transaction "checkpoint-CP1"
> → A, B, C, D, E, F become AVAILABLE again
> Time 10:07 - Restore from checkpoint CP1-1
> Time 10:08 - Consumer 1 (new instance) polls
> → Kafka returns available records
> → Might get [A, D, B] (different order than before!)
> Time 10:09 - Consumer 2 polls
> → Might get [C, E, F]
> ```
> *API Changes:*
> New Consumer API:
> // Batch-level transaction
> BatchTransaction beginBatchTransaction(String batchTransactionId);
> void commitBatchTransaction(BatchTransaction txn, Duration timeout)
> throws TimeoutException, TransactionException;
> void rollbackBatchTransaction(BatchTransaction txn);
> // Task-level transaction
> TaskTransaction beginTaskTransaction(BatchTransaction parent, String
> taskId);
> void acknowledge(TaskTransaction txn, ConsumerRecord<K,V> record,
> AcknowledgeType type);
> void prepareTaskTransaction(TaskTransaction txn);
> void commitTaskTransaction(TaskTransaction txn);
> void rollbackTaskTransaction(TaskTransaction txn);
> *New Configuration:*
> share.acknowledgement.mode=transactional # Default: explicit
> share.batch.transaction.timeout.ms=300000
> share.task.transaction.timeout.ms=120000
> share.transaction.commit.timeout.ms=30000
> share.transaction.auto.rollback=true
>
> *Target Use Cases:* Apache Spark structured streaming, Apache Flink
> exactly-once checkpoints, any coordinator-worker streaming framework.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)