Yun Tang created FLINK-26101:
--------------------------------

             Summary: Avoid shared state registry to discard duplicate 
changelog state
                 Key: FLINK-26101
                 URL: https://issues.apache.org/jira/browse/FLINK-26101
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
            Reporter: Yun Tang
            Assignee: Yun Tang
             Fix For: 1.15.0


Under change-log state backend, we will register same materialized keyed state 
handle multi times, and {{SharedStateRegistryImpl}} will discard the duplicated 
state handle.

{code:java}
if (!Objects.equals(state, entry.stateHandle)) {
    if (entry.confirmed || isPlaceholder(state)) {
        scheduledStateDeletion = state;
    } else {
        // Old entry is not in a confirmed checkpoint yet, and the new one 
differs.
        // This might result from (omitted KG range here for simplicity):
        // 1. Flink recovers from a failure using a checkpoint 1
        // 2. State Backend is initialized to UID xyz and a set of SST: { 
01.sst }
        // 3. JM triggers checkpoint 2
        // 4. TM sends handle: "xyz-002.sst"; JM registers it under 
"xyz-002.sst"
        // 5. TM crashes; everything is repeated from (2)
        // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
        // 7. JM triggers checkpoint 3
        // 8. TM sends NEW state "xyz-002.sst"
        // 9. JM discards it as duplicate
        // 10. checkpoint completes, but a wrong SST file is used
        // So we use a new entry and discard the old one:
        scheduledStateDeletion = entry.stateHandle;
        entry.stateHandle = state;
    }
{code}

Thus, we need to implement the {{#equals}} method for the registered state 
handles.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to