[ 
https://issues.apache.org/jira/browse/KAFKA-20685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18088371#comment-18088371
 ] 

Eswarar Siva commented on KAFKA-20685:
--------------------------------------

Thanks [~bbejeck]  and [~mjsax]  . Self-assigned.

I'll open the PR against the 4.2 branch. Plan is to mirror what KAFKA-10362 
did, delete the checkpoint file when an EOS active task moves from restored to 
RUNNING, plus a test . If a 4.1.3 ever happens happy to do the cherry-pick to 
4.1 too.

One question, should Affects Version/s be 3.8.0 - 4.2.x instead of just 4.1.2? 
The state updater is default since 3.8 so the same behavior should be there 
across that whole range.

> EOS: checkpoint file written at restoration completion persists through 
> RUNNING, state wipe is skipped after unclean crash
> --------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-20685
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20685
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 4.1.2
>         Environment: Kafka Streams 4.1.2 client, exactly_once_v2, default 
> state updater enabled, persistent timestamped RocksDB key-value stores, 
> commit.interval.ms=500. Kubernetes StatefulSets with persistent volumes that 
> survive pod restarts, static membership (group.instance.id per pod).
>            Reporter: Eswarar Siva
>            Priority: Critical
>              Labels: eos
>
> While crash testing our Streams app I found behavior that doesn't match the 
> documented EOS state store model. Reproduced twice with SIGKILL tests, 
> details below.
> h3. Observed behavior (reproducible)
> # After restoration completes, a {{.checkpoint}} file exists in the task 
> directory and stays there unchanged for the whole RUNNING session. Frozen 
> restore-time offsets, mtime never moves, verified on disk while processing 
> around 16k rec/s.
> # SIGKILL during active processing, zero grace period. On restart the 
> instance finds that stale checkpoint, logs "State store X initialized from 
> checkpoint with offset ...", no TaskCorruptedException, no wipe, and just 
> replays the changelog tail (81,732 records in our test). Happened 
> consistently across two independent kills.
> # The wipe path ("State store did not find checkpoint offsets while stores 
> are not empty...") only fired in a different test where the crash happened 
> during restoration itself, before any checkpoint entry existed.
> h3. Expected behavior
> Under EOS an unclean crash during RUNNING should hit the no-checkpoint path, 
> throw TaskCorruptedException and wipe local state, because the store may 
> contain uncommitted writes. RocksDBStore disables the WAL and Streams doesn't 
> flush on commit under EOS, so RocksDB background memtable flushes can persist 
> writes from a transaction that later gets aborted. The flushes don't know 
> anything about transaction boundaries.
> h3. Code paths (verified on the 4.1.2 tag)
> * {{DefaultStateUpdater.maybeCompleteRestoration}} (line 678) calls 
> {{task.maybeCheckpoint(true)}} (line 683) on restore completion with no EOS 
> condition. {{DefaultStateUpdater.maybeCheckpointTasks}} (line 712) also calls 
> {{task.maybeCheckpoint(false)}} (line 724) for all updating tasks every 
> commit interval, again with no EOS condition.
> * Nothing deletes that file on the restored to RUNNING transition. 
> {{StreamTask.completeRestoration}} writes its own checkpoint only if 
> {{!eosEnabled}}, which suggests the intent was that no checkpoint should 
> exist past that point under EOS. The file written by the state updater just 
> stays.
> * During RUNNING under EOS, {{StreamTask.postCommit}} only checkpoints when 
> {{enforceCheckpoint==true}} (suspend/close/rebalance), so the stale file 
> never gets refreshed or removed in steady state.
> * The only delete sites I could find: init time 
> ({{ProcessorStateManager.initializeStoreOffsetsFromCheckpoint}}), resume from 
> SUSPENDED ({{deleteCheckPointFileIfEOSEnabled}}, added by KAFKA-10362), and 
> {{DefaultStateUpdater.removeCheckpointForCorruptedTask}}.
> Net effect: the restoration-completion checkpoint survives the entire RUNNING 
> session and gets trusted at the next init, so a crash during RUNNING silently 
> skips the EOS wipe.
> h3. Why this can violate EOS
> The tail replay from the stale checkpoint runs read_committed, so it skips 
> aborted records and cannot clean up uncommitted writes that a background 
> memtable flush already pushed to SST files. Reprocessing of the uncommitted 
> input offsets overwrites the leftover values only when the topology is 
> deterministic and input record driven. State written by wall clock 
> punctuators is never regenerated. And read-before-write logic is the worst 
> case: dedup on an ID can read the leftover uncommitted value and silently 
> skip the redelivered record, so exactly-once becomes zero-times with no error 
> surfaced anywhere.
> KIP-892's motivation section states that under EOS, crash failures must wipe 
> task state because data is written to the store before the changelog commit 
> completes. The observed behavior bypasses that wipe.
> h3. History
> Before the state updater became the default (KAFKA-10199, enabled by default 
> via PR #16107), no checkpoint existed during RUNNING under EOS, so a RUNNING 
> crash reliably triggered the wipe. The restoration-time checkpoint was added 
> deliberately in PR #12279 (motivated by KAFKA-12634) with a safety argument 
> scoped to crashes during restoration, where over-restore is idempotent. That 
> argument doesn't extend to the RUNNING phase.
> Reproduced on 4.1.2. Not yet tested on 4.3.0 where KIP-1035 reworks the 
> offset storage.
> h3. Related
> KAFKA-10199, KAFKA-12634 (introduced restoration checkpointing), KAFKA-10362 
> (precedent: checkpoint deleted on resume under EOS for the same class of 
> risk), KAFKA-16017 (related but different mechanism), KIP-892 (the durable 
> fix, not yet released).
> h3. Proposed fix
> On the restored to RUNNING transition under EOS, delete the checkpoint file, 
> mirroring what KAFKA-10362 did on the resume path with 
> {{deleteCheckPointFileIfEOSEnabled()}}. This restores the invariant that no 
> checkpoint exists during RUNNING under EOS. The cost is full restores after 
> unclean RUNNING crashes, which is the cost KIP-892 eventually removes 
> properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to