[
https://issues.apache.org/jira/browse/KAFKA-19883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Shekhar Prasad Rajak updated KAFKA-19883:
-----------------------------------------
Description:
Add transactional acknowledgment support to Kafka Share Groups to enable
+*exactly-once read semantics*+ in distributed stream processing engines like
Apache Spark and Apache Flink.
The exactly-once guarantee comes from transactional acknowledgments ensuring no
record is permanently acknowledged until the checkpoint commits,
*Current Behavior:*
Share Groups support only immediate acknowledgment mode where records are
permanently acknowledged as soon as consumer.acknowledge() is called. This
creates data loss scenarios in distributed streaming frameworks:
1. Worker polls and acknowledges records to Kafka
2. Records are permanently removed from Kafka
3. Checkpoint/batch fails before writing to sink
4. Records are lost (acknowledged but never reached sink)
This prevents exactly-once semantics in Spark/Flink with Share Groups.
*Expected Behavior:*
Share Groups should support transactional acknowledgment mode where:
1. Records are acknowledged within transactions
2. Acknowledgments are pending until transaction commits
3. Transaction rollback makes ALL records available for retry
4. Enables atomic coordination with framework checkpoints/batches
*Proposed Solution:*
Implement hierarchical two-phase commit transactions:
{quote} BatchTransaction (coordinator/driver level)
├─ TaskTransaction1 (worker/executor level)
│ ├─ acknowledge(record1, ACCEPT)
│ ├─ acknowledge(record2, ACCEPT)
│ └─ prepare() ← Phase 1
│
├─ TaskTransaction2 (worker/executor level)
│ ├─ acknowledge(record3, ACCEPT)
│ └─ prepare() ← Phase 1
│
└─ commit() ← Phase 2: Atomically commits all task transactions
{quote}
- Batch transactions (coordinator-level)
- Task transactions (worker-level)
- Records stay in Kafka until transaction commits
- Rollback on failure → no data loss
Example Recovery after failure:
```
Example: Recovery After Failure
Checkpoint CP1 Processing:
Time 10:00 - Consumer 1 polls, gets records [A, B, C] (offsets 1000, 1001,
1002)
Time 10:01 - Consumer 2 polls, gets records [D, E, F] (offsets 1003, 1004,
1005)
Time 10:02 - Consumer 1 acknowledges A, B, C in transaction "checkpoint-CP1"
Time 10:03 - Consumer 2 acknowledges D, E, F in transaction "checkpoint-CP1"
Time 10:04 - Consumer 1 crashes before prepare()
Broker State:
Transaction "checkpoint-CP1": ACTIVE (not prepared)
Records in transaction:
- A (offset 1000): LOCKED → transaction lock
- B (offset 1001): LOCKED → transaction lock
- C (offset 1002): LOCKED → transaction lock
- D (offset 1003): LOCKED → transaction lock
- E (offset 1004): LOCKED → transaction lock
- F (offset 1005): LOCKED → transaction lock
Recovery (JobManager detects failure):
Time 10:05 - JobManager aborts checkpoint CP1
Time 10:06 - Rollback transaction "checkpoint-CP1"
→ A, B, C, D, E, F become AVAILABLE again
Time 10:07 - Restore from checkpoint CP1-1
Time 10:08 - Consumer 1 (new instance) polls
→ Kafka returns available records
→ Might get [A, D, B] (different order than before!)
Time 10:09 - Consumer 2 polls
→ Might get [C, E, F]
```
*API Changes:*
New Consumer API:
// Batch-level transaction
BatchTransaction beginBatchTransaction(String batchTransactionId);
void commitBatchTransaction(BatchTransaction txn, Duration timeout)
throws TimeoutException, TransactionException;
void rollbackBatchTransaction(BatchTransaction txn);
// Task-level transaction
TaskTransaction beginTaskTransaction(BatchTransaction parent, String taskId);
void acknowledge(TaskTransaction txn, ConsumerRecord<K,V> record,
AcknowledgeType type);
void prepareTaskTransaction(TaskTransaction txn);
void commitTaskTransaction(TaskTransaction txn);
void rollbackTaskTransaction(TaskTransaction txn);
*New Configuration:*
share.acknowledgement.mode=transactional # Default: explicit
share.batch.transaction.timeout.ms=300000
share.task.transaction.timeout.ms=120000
share.transaction.commit.timeout.ms=30000
share.transaction.auto.rollback=true
*Target Use Cases:* Apache Spark structured streaming, Apache Flink
exactly-once checkpoints, any coordinator-worker streaming framework.
was:
Add transactional acknowledgment support to Kafka Share Groups to enable
+*exactly-once read semantics*+ in distributed stream processing engines like
Apache Spark and Apache Flink.
*Current Behavior:*
Share Groups support only immediate acknowledgment mode where records are
permanently acknowledged as soon as consumer.acknowledge() is called. This
creates data loss scenarios in distributed streaming frameworks:
1. Worker polls and acknowledges records to Kafka
2. Records are permanently removed from Kafka
3. Checkpoint/batch fails before writing to sink
4. Records are lost (acknowledged but never reached sink)
This prevents exactly-once semantics in Spark/Flink with Share Groups.
*Expected Behavior:*
Share Groups should support transactional acknowledgment mode where:
1. Records are acknowledged within transactions
2. Acknowledgments are pending until transaction commits
3. Transaction rollback makes ALL records available for retry
4. Enables atomic coordination with framework checkpoints/batches
*Proposed Solution:*
Implement hierarchical two-phase commit transactions:
{quote} BatchTransaction (coordinator/driver level)
├─ TaskTransaction1 (worker/executor level)
│ ├─ acknowledge(record1, ACCEPT)
│ ├─ acknowledge(record2, ACCEPT)
│ └─ prepare() ← Phase 1
│
├─ TaskTransaction2 (worker/executor level)
│ ├─ acknowledge(record3, ACCEPT)
│ └─ prepare() ← Phase 1
│
└─ commit() ← Phase 2: Atomically commits all task transactions
{quote}
- Batch transactions (coordinator-level)
- Task transactions (worker-level)
- Records stay in Kafka until transaction commits
- Rollback on failure → no data loss
*API Changes:*
New Consumer API:
// Batch-level transaction
BatchTransaction beginBatchTransaction(String batchTransactionId);
void commitBatchTransaction(BatchTransaction txn, Duration timeout)
throws TimeoutException, TransactionException;
void rollbackBatchTransaction(BatchTransaction txn);
// Task-level transaction
TaskTransaction beginTaskTransaction(BatchTransaction parent, String taskId);
void acknowledge(TaskTransaction txn, ConsumerRecord<K,V> record,
AcknowledgeType type);
void prepareTaskTransaction(TaskTransaction txn);
void commitTaskTransaction(TaskTransaction txn);
void rollbackTaskTransaction(TaskTransaction txn);
*New Configuration:*
share.acknowledgement.mode=transactional # Default: explicit
share.batch.transaction.timeout.ms=300000
share.task.transaction.timeout.ms=120000
share.transaction.commit.timeout.ms=30000
share.transaction.auto.rollback=true
*Target Use Cases:* Apache Spark structured streaming, Apache Flink
exactly-once checkpoints, any coordinator-worker streaming framework.
> Support Hierarchical Transactional Acknowledgments for Share Groups
> -------------------------------------------------------------------
>
> Key: KAFKA-19883
> URL: https://issues.apache.org/jira/browse/KAFKA-19883
> Project: Kafka
> Issue Type: New Feature
> Components: clients, consumer, core
> Affects Versions: 4.2.0
> Reporter: Shekhar Prasad Rajak
> Assignee: Andrew Schofield
> Priority: Major
>
> Add transactional acknowledgment support to Kafka Share Groups to enable
> +*exactly-once read semantics*+ in distributed stream processing engines like
> Apache Spark and Apache Flink.
> The exactly-once guarantee comes from transactional acknowledgments ensuring
> no record is permanently acknowledged until the checkpoint commits,
>
> *Current Behavior:*
> Share Groups support only immediate acknowledgment mode where records are
> permanently acknowledged as soon as consumer.acknowledge() is called. This
> creates data loss scenarios in distributed streaming frameworks:
> 1. Worker polls and acknowledges records to Kafka
> 2. Records are permanently removed from Kafka
> 3. Checkpoint/batch fails before writing to sink
> 4. Records are lost (acknowledged but never reached sink)
> This prevents exactly-once semantics in Spark/Flink with Share Groups.
> *Expected Behavior:*
> Share Groups should support transactional acknowledgment mode where:
> 1. Records are acknowledged within transactions
> 2. Acknowledgments are pending until transaction commits
> 3. Transaction rollback makes ALL records available for retry
> 4. Enables atomic coordination with framework checkpoints/batches
> *Proposed Solution:*
> Implement hierarchical two-phase commit transactions:
> {quote} BatchTransaction (coordinator/driver level)
> ├─ TaskTransaction1 (worker/executor level)
> │ ├─ acknowledge(record1, ACCEPT)
> │ ├─ acknowledge(record2, ACCEPT)
> │ └─ prepare() ← Phase 1
> │
> ├─ TaskTransaction2 (worker/executor level)
> │ ├─ acknowledge(record3, ACCEPT)
> │ └─ prepare() ← Phase 1
> │
> └─ commit() ← Phase 2: Atomically commits all task transactions
> {quote}
>
> - Batch transactions (coordinator-level)
> - Task transactions (worker-level)
> - Records stay in Kafka until transaction commits
> - Rollback on failure → no data loss
>
> Example Recovery after failure:
>
> ```
> Example: Recovery After Failure
> Checkpoint CP1 Processing:
> Time 10:00 - Consumer 1 polls, gets records [A, B, C] (offsets 1000,
> 1001, 1002)
> Time 10:01 - Consumer 2 polls, gets records [D, E, F] (offsets 1003,
> 1004, 1005)
> Time 10:02 - Consumer 1 acknowledges A, B, C in transaction
> "checkpoint-CP1"
> Time 10:03 - Consumer 2 acknowledges D, E, F in transaction
> "checkpoint-CP1"
> Time 10:04 - Consumer 1 crashes before prepare()
> Broker State:
> Transaction "checkpoint-CP1": ACTIVE (not prepared)
> Records in transaction:
> - A (offset 1000): LOCKED → transaction lock
> - B (offset 1001): LOCKED → transaction lock
> - C (offset 1002): LOCKED → transaction lock
> - D (offset 1003): LOCKED → transaction lock
> - E (offset 1004): LOCKED → transaction lock
> - F (offset 1005): LOCKED → transaction lock
> Recovery (JobManager detects failure):
> Time 10:05 - JobManager aborts checkpoint CP1
> Time 10:06 - Rollback transaction "checkpoint-CP1"
> → A, B, C, D, E, F become AVAILABLE again
> Time 10:07 - Restore from checkpoint CP1-1
> Time 10:08 - Consumer 1 (new instance) polls
> → Kafka returns available records
> → Might get [A, D, B] (different order than before!)
> Time 10:09 - Consumer 2 polls
> → Might get [C, E, F]
> ```
> *API Changes:*
> New Consumer API:
> // Batch-level transaction
> BatchTransaction beginBatchTransaction(String batchTransactionId);
> void commitBatchTransaction(BatchTransaction txn, Duration timeout)
> throws TimeoutException, TransactionException;
> void rollbackBatchTransaction(BatchTransaction txn);
> // Task-level transaction
> TaskTransaction beginTaskTransaction(BatchTransaction parent, String
> taskId);
> void acknowledge(TaskTransaction txn, ConsumerRecord<K,V> record,
> AcknowledgeType type);
> void prepareTaskTransaction(TaskTransaction txn);
> void commitTaskTransaction(TaskTransaction txn);
> void rollbackTaskTransaction(TaskTransaction txn);
> *New Configuration:*
> share.acknowledgement.mode=transactional # Default: explicit
> share.batch.transaction.timeout.ms=300000
> share.task.transaction.timeout.ms=120000
> share.transaction.commit.timeout.ms=30000
> share.transaction.auto.rollback=true
>
> *Target Use Cases:* Apache Spark structured streaming, Apache Flink
> exactly-once checkpoints, any coordinator-worker streaming framework.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)