Robert Hoyt created FLINK-33170:
-----------------------------------

             Summary: HybridSourceSplitEnumerator causes dropped records in 
Hybrid Sources with 3+ sources
                 Key: FLINK-33170
                 URL: https://issues.apache.org/jira/browse/FLINK-33170
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Common
    Affects Versions: 1.17.1, 1.16.2, 1.15.4, 1.19.0, 1.18.1
            Reporter: Robert Hoyt


Possibly related to FLINK-27916.

 

In all versions since 1.15.x there's a subtle bug 
`HybridSourceSplitEnumerator`'s when determining if it's time to move on to the 
next source:

```

finishedReaders.add(subtaskId);

if (finishedReaders.size() == context.currentParallelism()) {

  // move on to the next source if it exists

```

This snippet is correct, but when changing to the next source, 
`finishedReaders` is never cleared. So when processing the second source, the 
`finishedReaders.size()` check will return true when the _first_ subtask 
finishes. The hybrid source moves on to the next source if one exists, so any 
unsent records in other subtasks will get dropped.

 

Concrete example with three sources, two subtasks each:
 # subtask 0 finishes with the first source. `finishedReaders` has size 1
 # subtask 1 finishes with the first source. `finishedReaders` has size 2 now, 
and moves on to the second source
 # subtask 1 finishes with the first source. `finishedReaders.add(1)` doesn't 
change the set; `finishedReaders` still has size 2. So the hybrid source moves 
on to the third source.
 # subtask 0 wasn't finished with the second source, but receives the 
notification to move on. Any unsent records are lost. *Data loss.*
 # this continues to the last source. The source doesn't change over if at the 
last source so the race condition in step 3 never happens

 

So step 3 results in the race condition that will drop records indeterminately 
for all but the first source and last source.

In production this issue caused the loss of GBs to TBs of data depending on the 
sources. We fixed it in a private fork by clearing the `finishedReaders` set 
when changing to the next source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to