Eswarar Siva created KAFKA-20685:
------------------------------------
Summary: EOS: checkpoint file written at restoration completion
persists through RUNNING, state wipe is skipped after unclean crash
Key: KAFKA-20685
URL: https://issues.apache.org/jira/browse/KAFKA-20685
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 4.1.2
Environment: Kafka Streams 4.1.2 client, exactly_once_v2, default
state updater enabled, persistent timestamped RocksDB key-value stores,
commit.interval.ms=500. Kubernetes StatefulSets with persistent volumes that
survive pod restarts, static membership (group.instance.id per pod).
Reporter: Eswarar Siva
While crash testing our Streams app I found behavior that doesn't match the
documented EOS state store model. Reproduced twice with SIGKILL tests, details
below.
h3. Observed behavior (reproducible)
# After restoration completes, a {{.checkpoint}} file exists in the task
directory and stays there unchanged for the whole RUNNING session. Frozen
restore-time offsets, mtime never moves, verified on disk while processing
around 16k rec/s.
# SIGKILL during active processing, zero grace period. On restart the instance
finds that stale checkpoint, logs "State store X initialized from checkpoint
with offset ...", no TaskCorruptedException, no wipe, and just replays the
changelog tail (81,732 records in our test). Happened consistently across two
independent kills.
# The wipe path ("State store did not find checkpoint offsets while stores are
not empty...") only fired in a different test where the crash happened during
restoration itself, before any checkpoint entry existed.
h3. Expected behavior
Under EOS an unclean crash during RUNNING should hit the no-checkpoint path,
throw TaskCorruptedException and wipe local state, because the store may
contain uncommitted writes. RocksDBStore disables the WAL and Streams doesn't
flush on commit under EOS, so RocksDB background memtable flushes can persist
writes from a transaction that later gets aborted. The flushes don't know
anything about transaction boundaries.
h3. Code paths (verified on the 4.1.2 tag)
* {{DefaultStateUpdater.maybeCompleteRestoration}} (line 678) calls
{{task.maybeCheckpoint(true)}} (line 683) on restore completion with no EOS
condition. {{DefaultStateUpdater.maybeCheckpointTasks}} (line 712) also calls
{{task.maybeCheckpoint(false)}} (line 724) for all updating tasks every commit
interval, again with no EOS condition.
* Nothing deletes that file on the restored to RUNNING transition.
{{StreamTask.completeRestoration}} writes its own checkpoint only if
{{!eosEnabled}}, which suggests the intent was that no checkpoint should exist
past that point under EOS. The file written by the state updater just stays.
* During RUNNING under EOS, {{StreamTask.postCommit}} only checkpoints when
{{enforceCheckpoint==true}} (suspend/close/rebalance), so the stale file never
gets refreshed or removed in steady state.
* The only delete sites I could find: init time
({{ProcessorStateManager.initializeStoreOffsetsFromCheckpoint}}), resume from
SUSPENDED ({{deleteCheckPointFileIfEOSEnabled}}, added by KAFKA-10362), and
{{DefaultStateUpdater.removeCheckpointForCorruptedTask}}.
Net effect: the restoration-completion checkpoint survives the entire RUNNING
session and gets trusted at the next init, so a crash during RUNNING silently
skips the EOS wipe.
h3. Why this can violate EOS
The tail replay from the stale checkpoint runs read_committed, so it skips
aborted records and cannot clean up uncommitted writes that a background
memtable flush already pushed to SST files. Reprocessing of the uncommitted
input offsets overwrites the leftover values only when the topology is
deterministic and input record driven. State written by wall clock punctuators
is never regenerated. And read-before-write logic is the worst case: dedup on
an ID can read the leftover uncommitted value and silently skip the redelivered
record, so exactly-once becomes zero-times with no error surfaced anywhere.
KIP-892's motivation section states that under EOS, crash failures must wipe
task state because data is written to the store before the changelog commit
completes. The observed behavior bypasses that wipe.
h3. History
Before the state updater became the default (KAFKA-10199, enabled by default
via PR #16107), no checkpoint existed during RUNNING under EOS, so a RUNNING
crash reliably triggered the wipe. The restoration-time checkpoint was added
deliberately in PR #12279 (motivated by KAFKA-12634) with a safety argument
scoped to crashes during restoration, where over-restore is idempotent. That
argument doesn't extend to the RUNNING phase.
Reproduced on 4.1.2. Not yet tested on 4.3.0 where KIP-1035 reworks the offset
storage.
h3. Related
KAFKA-10199, KAFKA-12634 (introduced restoration checkpointing), KAFKA-10362
(precedent: checkpoint deleted on resume under EOS for the same class of risk),
KAFKA-16017 (related but different mechanism), KIP-892 (the durable fix, not
yet released).
h3. Proposed fix
On the restored to RUNNING transition under EOS, delete the checkpoint file,
mirroring what KAFKA-10362 did on the resume path with
{{deleteCheckPointFileIfEOSEnabled()}}. This restores the invariant that no
checkpoint exists during RUNNING under EOS. The cost is full restores after
unclean RUNNING crashes, which is the cost KIP-892 eventually removes properly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)