Eswarar Siva created KAFKA-20685:
------------------------------------

             Summary: EOS: checkpoint file written at restoration completion 
persists through RUNNING, state wipe is skipped after unclean crash
                 Key: KAFKA-20685
                 URL: https://issues.apache.org/jira/browse/KAFKA-20685
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 4.1.2
         Environment: Kafka Streams 4.1.2 client, exactly_once_v2, default 
state updater enabled, persistent timestamped RocksDB key-value stores, 
commit.interval.ms=500. Kubernetes StatefulSets with persistent volumes that 
survive pod restarts, static membership (group.instance.id per pod).
            Reporter: Eswarar Siva


While crash testing our Streams app I found behavior that doesn't match the 
documented EOS state store model. Reproduced twice with SIGKILL tests, details 
below.

h3. Observed behavior (reproducible)

# After restoration completes, a {{.checkpoint}} file exists in the task 
directory and stays there unchanged for the whole RUNNING session. Frozen 
restore-time offsets, mtime never moves, verified on disk while processing 
around 16k rec/s.
# SIGKILL during active processing, zero grace period. On restart the instance 
finds that stale checkpoint, logs "State store X initialized from checkpoint 
with offset ...", no TaskCorruptedException, no wipe, and just replays the 
changelog tail (81,732 records in our test). Happened consistently across two 
independent kills.
# The wipe path ("State store did not find checkpoint offsets while stores are 
not empty...") only fired in a different test where the crash happened during 
restoration itself, before any checkpoint entry existed.

h3. Expected behavior

Under EOS an unclean crash during RUNNING should hit the no-checkpoint path, 
throw TaskCorruptedException and wipe local state, because the store may 
contain uncommitted writes. RocksDBStore disables the WAL and Streams doesn't 
flush on commit under EOS, so RocksDB background memtable flushes can persist 
writes from a transaction that later gets aborted. The flushes don't know 
anything about transaction boundaries.

h3. Code paths (verified on the 4.1.2 tag)

* {{DefaultStateUpdater.maybeCompleteRestoration}} (line 678) calls 
{{task.maybeCheckpoint(true)}} (line 683) on restore completion with no EOS 
condition. {{DefaultStateUpdater.maybeCheckpointTasks}} (line 712) also calls 
{{task.maybeCheckpoint(false)}} (line 724) for all updating tasks every commit 
interval, again with no EOS condition.
* Nothing deletes that file on the restored to RUNNING transition. 
{{StreamTask.completeRestoration}} writes its own checkpoint only if 
{{!eosEnabled}}, which suggests the intent was that no checkpoint should exist 
past that point under EOS. The file written by the state updater just stays.
* During RUNNING under EOS, {{StreamTask.postCommit}} only checkpoints when 
{{enforceCheckpoint==true}} (suspend/close/rebalance), so the stale file never 
gets refreshed or removed in steady state.
* The only delete sites I could find: init time 
({{ProcessorStateManager.initializeStoreOffsetsFromCheckpoint}}), resume from 
SUSPENDED ({{deleteCheckPointFileIfEOSEnabled}}, added by KAFKA-10362), and 
{{DefaultStateUpdater.removeCheckpointForCorruptedTask}}.

Net effect: the restoration-completion checkpoint survives the entire RUNNING 
session and gets trusted at the next init, so a crash during RUNNING silently 
skips the EOS wipe.

h3. Why this can violate EOS

The tail replay from the stale checkpoint runs read_committed, so it skips 
aborted records and cannot clean up uncommitted writes that a background 
memtable flush already pushed to SST files. Reprocessing of the uncommitted 
input offsets overwrites the leftover values only when the topology is 
deterministic and input record driven. State written by wall clock punctuators 
is never regenerated. And read-before-write logic is the worst case: dedup on 
an ID can read the leftover uncommitted value and silently skip the redelivered 
record, so exactly-once becomes zero-times with no error surfaced anywhere.

KIP-892's motivation section states that under EOS, crash failures must wipe 
task state because data is written to the store before the changelog commit 
completes. The observed behavior bypasses that wipe.

h3. History

Before the state updater became the default (KAFKA-10199, enabled by default 
via PR #16107), no checkpoint existed during RUNNING under EOS, so a RUNNING 
crash reliably triggered the wipe. The restoration-time checkpoint was added 
deliberately in PR #12279 (motivated by KAFKA-12634) with a safety argument 
scoped to crashes during restoration, where over-restore is idempotent. That 
argument doesn't extend to the RUNNING phase.

Reproduced on 4.1.2. Not yet tested on 4.3.0 where KIP-1035 reworks the offset 
storage.

h3. Related

KAFKA-10199, KAFKA-12634 (introduced restoration checkpointing), KAFKA-10362 
(precedent: checkpoint deleted on resume under EOS for the same class of risk), 
KAFKA-16017 (related but different mechanism), KIP-892 (the durable fix, not 
yet released).

h3. Proposed fix

On the restored to RUNNING transition under EOS, delete the checkpoint file, 
mirroring what KAFKA-10362 did on the resume path with 
{{deleteCheckPointFileIfEOSEnabled()}}. This restores the invariant that no 
checkpoint exists during RUNNING under EOS. The cost is full restores after 
unclean RUNNING crashes, which is the cost KIP-892 eventually removes properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to