[jira] [Updated] (FLINK-33170) HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ sources

2023-10-03 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-33170:
---
Component/s: Connectors / HybridSource
 (was: Connectors / Common)

> HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ 
> sources
> 
>
> Key: FLINK-33170
> URL: https://issues.apache.org/jira/browse/FLINK-33170
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HybridSource
>Affects Versions: 1.15.4, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Robert Hoyt
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Possibly related to FLINK-27916.
> Priority labeled critical because this issue can cause major data loss, in 
> our experience order GBs to TBs.
>  
> In all versions since 1.15.x there's a subtle bug in 
> {{HybridSourceSplitEnumerator}} when determining if it's time to move on to 
> the next source:
> {code:java}
> finishedReaders.add(subtaskId);
> if (finishedReaders.size() == context.currentParallelism()) {
>   // move on to the next source if it exists
> {code}
> This snippet is correct, but when changing to the next source, 
> {{finishedReaders}} is never cleared. So when processing the second source, 
> the {{finishedReaders.size()}} check will return true when the _first_ 
> subtask finishes.** The hybrid source moves on to the next source if one 
> exists, so any records remaining to be read and sent in the other 
> {{numSubtasks - 1}} subtasks will get dropped.
>  
> {{**}} if each of the sources in the hybrid source has the same parallelism. 
> If any source except the last has lower parallelism then I suspect that the 
> source will never move on: it's impossible for {{finishedReaders.size()}} to 
> shrink.
>  
> Concrete example with three sources, two subtasks each:
>  # subtask 0 finishes with the first source. {{finishedReaders}} has size 1
>  # subtask 1 finishes with the first source. {{finishedReaders}} has size 2 
> now, and moves on to the second source
>  # subtask 1 finishes with the first source. {{finishedReaders.add(1)}} 
> doesn't change the set; {{finishedReaders}} still has size 2. So the hybrid 
> source moves on to the third source.
>  # subtask 0 wasn't finished with the second source, but receives the 
> notification to move on. Any unsent records are lost. *Data loss!*
>  # this continues to the last source. The source doesn't change over if at 
> the last source so the race condition in step 3 never happens
>  
> So step 3 results in the race condition that will drop records 
> indeterminately for all but the first source and last source.
> In production this issue caused the loss of GBs to TBs of data when a hybrid 
> source had the following:
>  * 3-5 underlying sources, each of which should emit 100 GB to 10 TB worth of 
> records
>  * all sources had the same number of splits, around 64-256
> We fixed it in a private fork by clearing the {{finishedReaders}} set when 
> changing to the next source.
> Existing tests don't catch this data race because, as far as I understand 
> them:
>  * use two mock sources, whereas this bug manifests for 3+ sources
>  * have sources with parallelism 1, while this bug manifests when the sources 
> have parallelism > 1



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33170) HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ sources

2023-10-01 Thread Robert Hoyt (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Hoyt updated FLINK-33170:

Description: 
Possibly related to FLINK-27916.

Priority labeled critical because this issue can cause major data loss, in our 
experience order GBs to TBs.

 

In all versions since 1.15.x there's a subtle bug in 
{{HybridSourceSplitEnumerator}} when determining if it's time to move on to the 
next source:
{code:java}
finishedReaders.add(subtaskId);
if (finishedReaders.size() == context.currentParallelism()) {
  // move on to the next source if it exists
{code}
This snippet is correct, but when changing to the next source, 
{{finishedReaders}} is never cleared. So when processing the second source, the 
{{finishedReaders.size()}} check will return true when the _first_ subtask 
finishes.** The hybrid source moves on to the next source if one exists, so any 
records remaining to be read and sent in the other {{numSubtasks - 1}} subtasks 
will get dropped.

 

{{**}} if each of the sources in the hybrid source has the same parallelism. If 
any source except the last has lower parallelism then I suspect that the source 
will never move on: it's impossible for {{finishedReaders.size()}} to shrink.

 

Concrete example with three sources, two subtasks each:
 # subtask 0 finishes with the first source. {{finishedReaders}} has size 1
 # subtask 1 finishes with the first source. {{finishedReaders}} has size 2 
now, and moves on to the second source
 # subtask 1 finishes with the first source. {{finishedReaders.add(1)}} doesn't 
change the set; {{finishedReaders}} still has size 2. So the hybrid source 
moves on to the third source.
 # subtask 0 wasn't finished with the second source, but receives the 
notification to move on. Any unsent records are lost. *Data loss!*
 # this continues to the last source. The source doesn't change over if at the 
last source so the race condition in step 3 never happens

 

So step 3 results in the race condition that will drop records indeterminately 
for all but the first source and last source.

In production this issue caused the loss of GBs to TBs of data when a hybrid 
source had the following:
 * 3-5 underlying sources, each of which should emit 100 GB to 10 TB worth of 
records
 * all sources had the same number of splits, around 64-256

We fixed it in a private fork by clearing the {{finishedReaders}} set when 
changing to the next source.

Existing tests don't catch this data race because, as far as I understand them:
 * use two mock sources, whereas this bug manifests for 3+ sources
 * have sources with parallelism 1, while this bug manifests when the sources 
have parallelism > 1

  was:
Possibly related to FLINK-27916.

Priority labeled critical because this issue can cause major data loss, in our 
experience order GBs to TBs.

 

In all versions since 1.15.x there's a subtle bug in 
{{HybridSourceSplitEnumerator}} when determining if it's time to move on to the 
next source:
{code:java}
finishedReaders.add(subtaskId);
if (finishedReaders.size() == context.currentParallelism()) {
  // move on to the next source if it exists
{code}
This snippet is correct, but when changing to the next source, 
{{finishedReaders}} is never cleared. So when processing the second source, the 
{{finishedReaders.size()}} check will return true when the _first_ subtask 
finishes.** The hybrid source moves on to the next source if one exists, so any 
unsent records in other subtasks will get dropped.

 

{{**}} if each of the sources in the hybrid source has the same parallelism. If 
any source except the last has lower parallelism then I suspect that the source 
will never move on: it's impossible for {{finishedReaders.size()}} to shrink.

 

Concrete example with three sources, two subtasks each:
 # subtask 0 finishes with the first source. {{finishedReaders}} has size 1
 # subtask 1 finishes with the first source. {{finishedReaders}} has size 2 
now, and moves on to the second source
 # subtask 1 finishes with the first source. {{finishedReaders.add(1)}} doesn't 
change the set; {{finishedReaders}} still has size 2. So the hybrid source 
moves on to the third source.
 # subtask 0 wasn't finished with the second source, but receives the 
notification to move on. Any unsent records are lost. *Data loss!*
 # this continues to the last source. The source doesn't change over if at the 
last source so the race condition in step 3 never happens

 

So step 3 results in the race condition that will drop records indeterminately 
for all but the first source and last source.

In production this issue caused the loss of GBs to TBs of data depending on the 
sources. We fixed it in a private fork by clearing the {{finishedReaders}} set 
when changing to the next source.


> HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ 
> sources
> 

[jira] [Updated] (FLINK-33170) HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ sources

2023-10-01 Thread Robert Hoyt (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Hoyt updated FLINK-33170:

Description: 
Possibly related to FLINK-27916.

Priority labeled critical because this issue can cause major data loss, in our 
experience order GBs to TBs.

 

In all versions since 1.15.x there's a subtle bug in 
{{HybridSourceSplitEnumerator}} when determining if it's time to move on to the 
next source:
{code:java}
finishedReaders.add(subtaskId);
if (finishedReaders.size() == context.currentParallelism()) {
  // move on to the next source if it exists
{code}
This snippet is correct, but when changing to the next source, 
{{finishedReaders}} is never cleared. So when processing the second source, the 
{{finishedReaders.size()}} check will return true when the _first_ subtask 
finishes.** The hybrid source moves on to the next source if one exists, so any 
unsent records in other subtasks will get dropped.

 

{{**}} if each of the sources in the hybrid source has the same parallelism. If 
any source except the last has lower parallelism then I suspect that the source 
will never move on: it's impossible for {{finishedReaders.size()}} to shrink.

 

Concrete example with three sources, two subtasks each:
 # subtask 0 finishes with the first source. {{finishedReaders}} has size 1
 # subtask 1 finishes with the first source. {{finishedReaders}} has size 2 
now, and moves on to the second source
 # subtask 1 finishes with the first source. {{finishedReaders.add(1)}} doesn't 
change the set; {{finishedReaders}} still has size 2. So the hybrid source 
moves on to the third source.
 # subtask 0 wasn't finished with the second source, but receives the 
notification to move on. Any unsent records are lost. *Data loss!*
 # this continues to the last source. The source doesn't change over if at the 
last source so the race condition in step 3 never happens

 

So step 3 results in the race condition that will drop records indeterminately 
for all but the first source and last source.

In production this issue caused the loss of GBs to TBs of data depending on the 
sources. We fixed it in a private fork by clearing the {{finishedReaders}} set 
when changing to the next source.

  was:
Possibly related to FLINK-27916.

 

In all versions since 1.15.x there's a subtle bug in 
{{HybridSourceSplitEnumerator}} when determining if it's time to move on to the 
next source:
{code:java}
finishedReaders.add(subtaskId);
if (finishedReaders.size() == context.currentParallelism()) {
  // move on to the next source if it exists
{code}
This snippet is correct, but when changing to the next source, 
{{finishedReaders}} is never cleared. So when processing the second source, the 
{{finishedReaders.size()}} check will return true when the _first_ subtask 
finishes.** The hybrid source moves on to the next source if one exists, so any 
unsent records in other subtasks will get dropped.

 

{{**}} if each of the sources in the hybrid source has the same parallelism. If 
any source except the last has lower parallelism then I suspect that the source 
will never move on: it's impossible for {{finishedReaders.size()}} to shrink.

 

Concrete example with three sources, two subtasks each:
 # subtask 0 finishes with the first source. {{finishedReaders}} has size 1
 # subtask 1 finishes with the first source. {{finishedReaders}} has size 2 
now, and moves on to the second source
 # subtask 1 finishes with the first source. {{finishedReaders.add(1)}} doesn't 
change the set; {{finishedReaders}} still has size 2. So the hybrid source 
moves on to the third source.
 # subtask 0 wasn't finished with the second source, but receives the 
notification to move on. Any unsent records are lost. *Data loss!*
 # this continues to the last source. The source doesn't change over if at the 
last source so the race condition in step 3 never happens

 

So step 3 results in the race condition that will drop records indeterminately 
for all but the first source and last source.

In production this issue caused the loss of GBs to TBs of data depending on the 
sources. We fixed it in a private fork by clearing the {{finishedReaders}} set 
when changing to the next source.


> HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ 
> sources
> 
>
> Key: FLINK-33170
> URL: https://issues.apache.org/jira/browse/FLINK-33170
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.15.4, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Robert Hoyt
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Possibly related to FLINK-27916.
> Priority labeled critical because this issue can cause major data loss, in 
> our experience 

[jira] [Updated] (FLINK-33170) HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ sources

2023-10-01 Thread Robert Hoyt (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Hoyt updated FLINK-33170:

Description: 
Possibly related to FLINK-27916.

 

In all versions since 1.15.x there's a subtle bug in 
{{HybridSourceSplitEnumerator}} when determining if it's time to move on to the 
next source:
{code:java}
finishedReaders.add(subtaskId);
if (finishedReaders.size() == context.currentParallelism()) {
  // move on to the next source if it exists
{code}
This snippet is correct, but when changing to the next source, 
{{finishedReaders}} is never cleared. So when processing the second source, the 
{{finishedReaders.size()}} check will return true when the _first_ subtask 
finishes.** The hybrid source moves on to the next source if one exists, so any 
unsent records in other subtasks will get dropped.

 

{{**}} if each of the sources in the hybrid source has the same parallelism. If 
any source except the last has lower parallelism then I suspect that the source 
will never move on: it's impossible for {{finishedReaders.size()}} to shrink.

 

Concrete example with three sources, two subtasks each:
 # subtask 0 finishes with the first source. {{finishedReaders}} has size 1
 # subtask 1 finishes with the first source. {{finishedReaders}} has size 2 
now, and moves on to the second source
 # subtask 1 finishes with the first source. {{finishedReaders.add(1)}} doesn't 
change the set; {{finishedReaders}} still has size 2. So the hybrid source 
moves on to the third source.
 # subtask 0 wasn't finished with the second source, but receives the 
notification to move on. Any unsent records are lost. *Data loss!*
 # this continues to the last source. The source doesn't change over if at the 
last source so the race condition in step 3 never happens

 

So step 3 results in the race condition that will drop records indeterminately 
for all but the first source and last source.

In production this issue caused the loss of GBs to TBs of data depending on the 
sources. We fixed it in a private fork by clearing the {{finishedReaders}} set 
when changing to the next source.

  was:
Possibly related to FLINK-27916.

 

In all versions since 1.15.x there's a subtle bug in 
{{HybridSourceSplitEnumerator}} when determining if it's time to move on to the 
next source:
{code:java}
finishedReaders.add(subtaskId);
if (finishedReaders.size() == context.currentParallelism()) {
  // move on to the next source if it exists
{code}
This snippet is correct, but when changing to the next source, 
{{finishedReaders}} is never cleared. So when processing the second source, the 
{{finishedReaders.size()}} check will return true when the _first_ subtask 
finishes.** The hybrid source moves on to the next source if one exists, so any 
unsent records in other subtasks will get dropped.

 

{{**}} if each of the sources in the hybrid source has the same parallelism. If 
any source except the last has lower parallelism then I suspect that the source 
will never move on: it's impossible for `finishedReaders.size()` to shrink.

 

Concrete example with three sources, two subtasks each:
 # subtask 0 finishes with the first source. {{finishedReaders}} has size 1
 # subtask 1 finishes with the first source. {{finishedReaders}} has size 2 
now, and moves on to the second source
 # subtask 1 finishes with the first source. {{finishedReaders.add(1)}} doesn't 
change the set; {{finishedReaders}} still has size 2. So the hybrid source 
moves on to the third source.
 # subtask 0 wasn't finished with the second source, but receives the 
notification to move on. Any unsent records are lost. *Data loss!*
 # this continues to the last source. The source doesn't change over if at the 
last source so the race condition in step 3 never happens

 

So step 3 results in the race condition that will drop records indeterminately 
for all but the first source and last source.

In production this issue caused the loss of GBs to TBs of data depending on the 
sources. We fixed it in a private fork by clearing the {{finishedReaders}} set 
when changing to the next source.


> HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ 
> sources
> 
>
> Key: FLINK-33170
> URL: https://issues.apache.org/jira/browse/FLINK-33170
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.15.4, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Robert Hoyt
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Possibly related to FLINK-27916.
>  
> In all versions since 1.15.x there's a subtle bug in 
> {{HybridSourceSplitEnumerator}} when determining if it's time to move on to 
> the next source:
> {code:java}
> finishedReaders.add(subtaskId);
> 

[jira] [Updated] (FLINK-33170) HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ sources

2023-10-01 Thread Robert Hoyt (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Hoyt updated FLINK-33170:

Description: 
Possibly related to FLINK-27916.

 

In all versions since 1.15.x there's a subtle bug in 
{{HybridSourceSplitEnumerator}} when determining if it's time to move on to the 
next source:
{code:java}
finishedReaders.add(subtaskId);
if (finishedReaders.size() == context.currentParallelism()) {
  // move on to the next source if it exists
{code}
This snippet is correct, but when changing to the next source, 
{{finishedReaders}} is never cleared. So when processing the second source, the 
{{finishedReaders.size()}} check will return true when the _first_ subtask 
finishes.** The hybrid source moves on to the next source if one exists, so any 
unsent records in other subtasks will get dropped.

 

{{**}} if each of the sources in the hybrid source has the same parallelism. If 
any source except the last has lower parallelism then I suspect that the source 
will never move on: it's impossible for `finishedReaders.size()` to shrink.

 

Concrete example with three sources, two subtasks each:
 # subtask 0 finishes with the first source. {{finishedReaders}} has size 1
 # subtask 1 finishes with the first source. {{finishedReaders}} has size 2 
now, and moves on to the second source
 # subtask 1 finishes with the first source. {{finishedReaders.add(1)}} doesn't 
change the set; {{finishedReaders}} still has size 2. So the hybrid source 
moves on to the third source.
 # subtask 0 wasn't finished with the second source, but receives the 
notification to move on. Any unsent records are lost. *Data loss!*
 # this continues to the last source. The source doesn't change over if at the 
last source so the race condition in step 3 never happens

 

So step 3 results in the race condition that will drop records indeterminately 
for all but the first source and last source.

In production this issue caused the loss of GBs to TBs of data depending on the 
sources. We fixed it in a private fork by clearing the {{finishedReaders}} set 
when changing to the next source.

  was:
Possibly related to FLINK-27916.

 

In all versions since 1.15.x there's a subtle bug in 
{{HybridSourceSplitEnumerator}} when determining if it's time to move on to the 
next source:
{code:java}
finishedReaders.add(subtaskId);
if (finishedReaders.size() == context.currentParallelism()) {
  // move on to the next source if it exists
{code}
This snippet is correct, but when changing to the next source, 
{{finishedReaders}} is never cleared. So when processing the second source, the 
{{finishedReaders.size()}} check will return true when the _first_ subtask 
finishes.** The hybrid source moves on to the next source if one exists, so any 
unsent records in other subtasks will get dropped.

 

** if each of the sources in the hybrid source has the same parallelism. If any 
source except the last has lower parallelism then I suspect that the source 
will never move on: it's impossible for `finishedReaders.size()` to shrink.

 

Concrete example with three sources, two subtasks each:
 # subtask 0 finishes with the first source. {{finishedReaders}} has size 1
 # subtask 1 finishes with the first source. {{finishedReaders}} has size 2 
now, and moves on to the second source
 # subtask 1 finishes with the first source. {{finishedReaders.add(1)}} doesn't 
change the set; {{finishedReaders}} still has size 2. So the hybrid source 
moves on to the third source.
 # subtask 0 wasn't finished with the second source, but receives the 
notification to move on. Any unsent records are lost. *Data loss!*
 # this continues to the last source. The source doesn't change over if at the 
last source so the race condition in step 3 never happens

 

So step 3 results in the race condition that will drop records indeterminately 
for all but the first source and last source.

In production this issue caused the loss of GBs to TBs of data depending on the 
sources. We fixed it in a private fork by clearing the {{finishedReaders}} set 
when changing to the next source.


> HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ 
> sources
> 
>
> Key: FLINK-33170
> URL: https://issues.apache.org/jira/browse/FLINK-33170
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.15.4, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Robert Hoyt
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Possibly related to FLINK-27916.
>  
> In all versions since 1.15.x there's a subtle bug in 
> {{HybridSourceSplitEnumerator}} when determining if it's time to move on to 
> the next source:
> {code:java}
> finishedReaders.add(subtaskId);
> if 

[jira] [Updated] (FLINK-33170) HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ sources

2023-09-29 Thread Robert Hoyt (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Hoyt updated FLINK-33170:

Description: 
Possibly related to FLINK-27916.

 

In all versions since 1.15.x there's a subtle bug in 
{{HybridSourceSplitEnumerator}} when determining if it's time to move on to the 
next source:
{code:java}
finishedReaders.add(subtaskId);
if (finishedReaders.size() == context.currentParallelism()) {
  // move on to the next source if it exists
{code}
This snippet is correct, but when changing to the next source, 
{{finishedReaders}} is never cleared. So when processing the second source, the 
{{finishedReaders.size()}} check will return true when the _first_ subtask 
finishes.** The hybrid source moves on to the next source if one exists, so any 
unsent records in other subtasks will get dropped.

 

** if each of the sources in the hybrid source has the same parallelism. If any 
source except the last has lower parallelism then I suspect that the source 
will never move on: it's impossible for `finishedReaders.size()` to shrink.

 

Concrete example with three sources, two subtasks each:
 # subtask 0 finishes with the first source. {{finishedReaders}} has size 1
 # subtask 1 finishes with the first source. {{finishedReaders}} has size 2 
now, and moves on to the second source
 # subtask 1 finishes with the first source. {{finishedReaders.add(1)}} doesn't 
change the set; {{finishedReaders}} still has size 2. So the hybrid source 
moves on to the third source.
 # subtask 0 wasn't finished with the second source, but receives the 
notification to move on. Any unsent records are lost. *Data loss!*
 # this continues to the last source. The source doesn't change over if at the 
last source so the race condition in step 3 never happens

 

So step 3 results in the race condition that will drop records indeterminately 
for all but the first source and last source.

In production this issue caused the loss of GBs to TBs of data depending on the 
sources. We fixed it in a private fork by clearing the {{finishedReaders}} set 
when changing to the next source.

  was:
Possibly related to FLINK-27916.

 

In all versions since 1.15.x there's a subtle bug in 
HybridSourceSplitEnumerator when determining if it's time to move on to the 
next source:
{code:java}
finishedReaders.add(subtaskId);
if (finishedReaders.size() == context.currentParallelism()) {
  // move on to the next source if it exists
{code}
This snippet is correct, but when changing to the next source, 
`finishedReaders` is never cleared. So when processing the second source, the 
`finishedReaders.size()` check will return true when the _first_ subtask 
finishes. The hybrid source moves on to the next source if one exists, so any 
unsent records in other subtasks will get dropped.

 

Concrete example with three sources, two subtasks each:
 # subtask 0 finishes with the first source. `finishedReaders` has size 1
 # subtask 1 finishes with the first source. `finishedReaders` has size 2 now, 
and moves on to the second source
 # subtask 1 finishes with the first source. `finishedReaders.add(1)` doesn't 
change the set; `finishedReaders` still has size 2. So the hybrid source moves 
on to the third source.
 # subtask 0 wasn't finished with the second source, but receives the 
notification to move on. Any unsent records are lost. *Data loss.*
 # this continues to the last source. The source doesn't change over if at the 
last source so the race condition in step 3 never happens

 

So step 3 results in the race condition that will drop records indeterminately 
for all but the first source and last source.

In production this issue caused the loss of GBs to TBs of data depending on the 
sources. We fixed it in a private fork by clearing the `finishedReaders` set 
when changing to the next source.


> HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ 
> sources
> 
>
> Key: FLINK-33170
> URL: https://issues.apache.org/jira/browse/FLINK-33170
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.15.4, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Robert Hoyt
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Possibly related to FLINK-27916.
>  
> In all versions since 1.15.x there's a subtle bug in 
> {{HybridSourceSplitEnumerator}} when determining if it's time to move on to 
> the next source:
> {code:java}
> finishedReaders.add(subtaskId);
> if (finishedReaders.size() == context.currentParallelism()) {
>   // move on to the next source if it exists
> {code}
> This snippet is correct, but when changing to the next source, 
> {{finishedReaders}} is never cleared. So when processing the second source, 
> the 

[jira] [Updated] (FLINK-33170) HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ sources

2023-09-29 Thread Robert Hoyt (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Hoyt updated FLINK-33170:

Description: 
Possibly related to FLINK-27916.

 

In all versions since 1.15.x there's a subtle bug in 
HybridSourceSplitEnumerator when determining if it's time to move on to the 
next source:
{code:java}
finishedReaders.add(subtaskId);
if (finishedReaders.size() == context.currentParallelism()) {
  // move on to the next source if it exists
{code}
This snippet is correct, but when changing to the next source, 
`finishedReaders` is never cleared. So when processing the second source, the 
`finishedReaders.size()` check will return true when the _first_ subtask 
finishes. The hybrid source moves on to the next source if one exists, so any 
unsent records in other subtasks will get dropped.

 

Concrete example with three sources, two subtasks each:
 # subtask 0 finishes with the first source. `finishedReaders` has size 1
 # subtask 1 finishes with the first source. `finishedReaders` has size 2 now, 
and moves on to the second source
 # subtask 1 finishes with the first source. `finishedReaders.add(1)` doesn't 
change the set; `finishedReaders` still has size 2. So the hybrid source moves 
on to the third source.
 # subtask 0 wasn't finished with the second source, but receives the 
notification to move on. Any unsent records are lost. *Data loss.*
 # this continues to the last source. The source doesn't change over if at the 
last source so the race condition in step 3 never happens

 

So step 3 results in the race condition that will drop records indeterminately 
for all but the first source and last source.

In production this issue caused the loss of GBs to TBs of data depending on the 
sources. We fixed it in a private fork by clearing the `finishedReaders` set 
when changing to the next source.

  was:
Possibly related to FLINK-27916.

 

In all versions since 1.15.x there's a subtle bug 
`HybridSourceSplitEnumerator`'s when determining if it's time to move on to the 
next source:

```

finishedReaders.add(subtaskId);

if (finishedReaders.size() == context.currentParallelism()) {

  // move on to the next source if it exists

```

This snippet is correct, but when changing to the next source, 
`finishedReaders` is never cleared. So when processing the second source, the 
`finishedReaders.size()` check will return true when the _first_ subtask 
finishes. The hybrid source moves on to the next source if one exists, so any 
unsent records in other subtasks will get dropped.

 

Concrete example with three sources, two subtasks each:
 # subtask 0 finishes with the first source. `finishedReaders` has size 1
 # subtask 1 finishes with the first source. `finishedReaders` has size 2 now, 
and moves on to the second source
 # subtask 1 finishes with the first source. `finishedReaders.add(1)` doesn't 
change the set; `finishedReaders` still has size 2. So the hybrid source moves 
on to the third source.
 # subtask 0 wasn't finished with the second source, but receives the 
notification to move on. Any unsent records are lost. *Data loss.*
 # this continues to the last source. The source doesn't change over if at the 
last source so the race condition in step 3 never happens

 

So step 3 results in the race condition that will drop records indeterminately 
for all but the first source and last source.

In production this issue caused the loss of GBs to TBs of data depending on the 
sources. We fixed it in a private fork by clearing the `finishedReaders` set 
when changing to the next source.


> HybridSourceSplitEnumerator causes dropped records in Hybrid Sources with 3+ 
> sources
> 
>
> Key: FLINK-33170
> URL: https://issues.apache.org/jira/browse/FLINK-33170
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.15.4, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Robert Hoyt
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Possibly related to FLINK-27916.
>  
> In all versions since 1.15.x there's a subtle bug in 
> HybridSourceSplitEnumerator when determining if it's time to move on to the 
> next source:
> {code:java}
> finishedReaders.add(subtaskId);
> if (finishedReaders.size() == context.currentParallelism()) {
>   // move on to the next source if it exists
> {code}
> This snippet is correct, but when changing to the next source, 
> `finishedReaders` is never cleared. So when processing the second source, the 
> `finishedReaders.size()` check will return true when the _first_ subtask 
> finishes. The hybrid source moves on to the next source if one exists, so any 
> unsent records in other subtasks will get dropped.
>  
> Concrete example with three sources, two subtasks