[jira] [Commented] (FLINK-29459) Sink v2 has bugs in supporting legacy v1 implementations with global committer

2023-08-30 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17760626#comment-17760626
 ] 

Martijn Visser commented on FLINK-29459:


[~gaoyunhaii] Sorry for the late reply, let's continue the discussion here, 
since we don't have to involve the change of the API. I know that [~tzulitai] 
is also interested in this topic, so let us know what you think!

> Sink v2 has bugs in supporting legacy v1 implementations with global committer
> --
>
> Key: FLINK-29459
> URL: https://issues.apache.org/jira/browse/FLINK-29459
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.16.0, 1.17.0, 1.15.3
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> Currently when supporting Sink implementation using version 1 interface, 
> there are issues after restoring from a checkpoint after failover:
>  # In global committer operator, when restoring SubtaskCommittableManager, 
> the subtask id is replaced with the one in the current operator. This means 
> that the id originally is the id of the sender task (0 ~ N - 1), but after 
> restoring it has to be 0. This would cause Duplication Key exception during 
> restoring.
>  # For Committer operator, the subtaskId of CheckpointCommittableManagerImpl 
> is always restored to 0 after failover for all the subtasks. This makes the 
> summary sent to the Global Committer is attached with wrong subtask id.
>  # For Committer operator, the checkpoint id of SubtaskCommittableManager is 
> always restored to 1 after failover, this make the following committable sent 
> to the global committer is attached with wrong checkpoint id. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29459) Sink v2 has bugs in supporting legacy v1 implementations with global committer

2023-07-23 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746204#comment-17746204
 ] 

Yun Gao commented on FLINK-29459:
-

Hi [~martijnvisser]  Logically it does not involve the change of API, do you 
think we should move the discussion to the thread or we could continue the 
discussion under this issue? Both works from my side. 

> Sink v2 has bugs in supporting legacy v1 implementations with global committer
> --
>
> Key: FLINK-29459
> URL: https://issues.apache.org/jira/browse/FLINK-29459
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.16.0, 1.17.0, 1.15.3
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> Currently when supporting Sink implementation using version 1 interface, 
> there are issues after restoring from a checkpoint after failover:
>  # In global committer operator, when restoring SubtaskCommittableManager, 
> the subtask id is replaced with the one in the current operator. This means 
> that the id originally is the id of the sender task (0 ~ N - 1), but after 
> restoring it has to be 0. This would cause Duplication Key exception during 
> restoring.
>  # For Committer operator, the subtaskId of CheckpointCommittableManagerImpl 
> is always restored to 0 after failover for all the subtasks. This makes the 
> summary sent to the Global Committer is attached with wrong subtask id.
>  # For Committer operator, the checkpoint id of SubtaskCommittableManager is 
> always restored to 1 after failover, this make the following committable sent 
> to the global committer is attached with wrong checkpoint id. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29459) Sink v2 has bugs in supporting legacy v1 implementations with global committer

2023-07-21 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17745579#comment-17745579
 ] 

Martijn Visser commented on FLINK-29459:


[~gaoyunhaii] Should we restart the discussion for fixing this problem?

> Sink v2 has bugs in supporting legacy v1 implementations with global committer
> --
>
> Key: FLINK-29459
> URL: https://issues.apache.org/jira/browse/FLINK-29459
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.16.0, 1.17.0, 1.15.3
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> Currently when supporting Sink implementation using version 1 interface, 
> there are issues after restoring from a checkpoint after failover:
>  # In global committer operator, when restoring SubtaskCommittableManager, 
> the subtask id is replaced with the one in the current operator. This means 
> that the id originally is the id of the sender task (0 ~ N - 1), but after 
> restoring it has to be 0. This would cause Duplication Key exception during 
> restoring.
>  # For Committer operator, the subtaskId of CheckpointCommittableManagerImpl 
> is always restored to 0 after failover for all the subtasks. This makes the 
> summary sent to the Global Committer is attached with wrong subtask id.
>  # For Committer operator, the checkpoint id of SubtaskCommittableManager is 
> always restored to 1 after failover, this make the following committable sent 
> to the global committer is attached with wrong checkpoint id. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29459) Sink v2 has bugs in supporting legacy v1 implementations with global committer

2022-10-24 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623149#comment-17623149
 ] 

Yun Gao commented on FLINK-29459:
-

Hi [~fpaul] [~KristoffSC] Very thanks for fixing the issues and very sorry for 
missed the previous notifications for in the holiday then. Regarding the 
current sink v2 mechanism I have some more thoughts:

Currently we rely on the CommittableSummary and CommittableWithLineage message 
to coordinate between Writers and Committers. For each checkpoint, each Writer 
subtask would first emit a CommittableSummary to the Committers, which contains 
the number of Committables to send. Then the Writer subtask emit that number of 
CommittableWithLineage messages to the Committers. The Committers relies on the 
number in the summary to detect if it has received all the Committables from 
each write subtask. But the mechanism contains some issues:
 # It could only support the partitioner with one target for each source 
between Writer and Committer, like forward / rescale. If for the long run we 
want to support the Committers with arbitrary parallelism, it might cause 
issues if Writer and Committer have different parallelism. Similarly it also 
complicate the authors of connectors that using PreCommitterTopolgy. 
 # With unaligned checkpoint and rescale after recovering, if some 
CommittableSummary messages have been processed and stored in the snapshot, but 
the corresponding CommittableWithLineage messages have been assigned to other 
tasks, the number of Committables would be not correct. 

One possible alternative might be instead of relying on numbers, we might first 
emit the Committables, then followed by a broadcast message that confirms the 
end of a checkpoint. The Committable would know that it has received all the 
Committables after received the Confirmed messages from all the previous tasks. 
The mechanism is a bit like how watermark works. Then for the above two issues:


 # It would support all the partitioners. 
 # For unaligned checkpoint and rescaling case, we could simply commit all the 
Committables with the startup id and ignore all the confirmation messages of 
the same checkpoint id on startup. We could then wait for the confirmation 
message of the next checkpoint id to mark all the previous checkpoints as 
finished. 

How do you think about this? Sorry if I overlook something. 

> Sink v2 has bugs in supporting legacy v1 implementations with global committer
> --
>
> Key: FLINK-29459
> URL: https://issues.apache.org/jira/browse/FLINK-29459
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.16.0, 1.17.0, 1.15.2
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.17.0, 1.15.3, 1.16.1
>
>
> Currently when supporting Sink implementation using version 1 interface, 
> there are issues after restoring from a checkpoint after failover:
>  # In global committer operator, when restoring SubtaskCommittableManager, 
> the subtask id is replaced with the one in the current operator. This means 
> that the id originally is the id of the sender task (0 ~ N - 1), but after 
> restoring it has to be 0. This would cause Duplication Key exception during 
> restoring.
>  # For Committer operator, the subtaskId of CheckpointCommittableManagerImpl 
> is always restored to 0 after failover for all the subtasks. This makes the 
> summary sent to the Global Committer is attached with wrong subtask id.
>  # For Committer operator, the checkpoint id of SubtaskCommittableManager is 
> always restored to 1 after failover, this make the following committable sent 
> to the global committer is attached with wrong checkpoint id. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29459) Sink v2 has bugs in supporting legacy v1 implementations with global committer

2022-10-20 Thread Krzysztof Chmielewski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17621187#comment-17621187
 ] 

Krzysztof Chmielewski commented on FLINK-29459:
---

FYI ticets
29509
29512
29627

are fixing issue with Task manager recovery for Sink architecture with global 
committer.

The 29583 is about recovering Flink 1.14 unified sinks committer state and 
migrate it to the extended unified model.

> Sink v2 has bugs in supporting legacy v1 implementations with global committer
> --
>
> Key: FLINK-29459
> URL: https://issues.apache.org/jira/browse/FLINK-29459
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.16.0, 1.17.0, 1.15.2
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.17.0, 1.15.3, 1.16.1
>
>
> Currently when supporting Sink implementation using version 1 interface, 
> there are issues after restoring from a checkpoint after failover:
>  # In global committer operator, when restoring SubtaskCommittableManager, 
> the subtask id is replaced with the one in the current operator. This means 
> that the id originally is the id of the sender task (0 ~ N - 1), but after 
> restoring it has to be 0. This would cause Duplication Key exception during 
> restoring.
>  # For Committer operator, the subtaskId of CheckpointCommittableManagerImpl 
> is always restored to 0 after failover for all the subtasks. This makes the 
> summary sent to the Global Committer is attached with wrong subtask id.
>  # For Committer operator, the checkpoint id of SubtaskCommittableManager is 
> always restored to 1 after failover, this make the following committable sent 
> to the global committer is attached with wrong checkpoint id. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29459) Sink v2 has bugs in supporting legacy v1 implementations with global committer

2022-10-05 Thread Fabian Paul (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17612982#comment-17612982
 ] 

Fabian Paul commented on FLINK-29459:
-

It also looks like the first and second point are the same problem, aren't they?

> Sink v2 has bugs in supporting legacy v1 implementations with global committer
> --
>
> Key: FLINK-29459
> URL: https://issues.apache.org/jira/browse/FLINK-29459
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.16.0, 1.17.0, 1.15.2
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.17.0, 1.15.3, 1.16.1
>
>
> Currently when supporting Sink implementation using version 1 interface, 
> there are issues after restoring from a checkpoint after failover:
>  # In global committer operator, when restoring SubtaskCommittableManager, 
> the subtask id is replaced with the one in the current operator. This means 
> that the id originally is the id of the sender task (0 ~ N - 1), but after 
> restoring it has to be 0. This would cause Duplication Key exception during 
> restoring.
>  # For Committer operator, the subtaskId of CheckpointCommittableManagerImpl 
> is always restored to 0 after failover for all the subtasks. This makes the 
> summary sent to the Global Committer is attached with wrong subtask id.
>  # For Committer operator, the checkpoint id of SubtaskCommittableManager is 
> always restored to 1 after failover, this make the following committable sent 
> to the global committer is attached with wrong checkpoint id. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29459) Sink v2 has bugs in supporting legacy v1 implementations with global committer

2022-10-05 Thread Fabian Paul (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17612980#comment-17612980
 ] 

Fabian Paul commented on FLINK-29459:
-

[~gaoyunhaii] thanks for your analysis. I am currently looking into the issues, 
and I think it is a good idea to split the different problems into different 
tickets. 

I already created https://issues.apache.org/jira/browse/FLINK-29509 to fix the 
subtask id problem during recovery.

Let me know if you have already started with that.

> Sink v2 has bugs in supporting legacy v1 implementations with global committer
> --
>
> Key: FLINK-29459
> URL: https://issues.apache.org/jira/browse/FLINK-29459
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.16.0, 1.17.0, 1.15.2
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
> Fix For: 1.17.0, 1.15.3, 1.16.1
>
>
> Currently when supporting Sink implementation using version 1 interface, 
> there are issues after restoring from a checkpoint after failover:
>  # In global committer operator, when restoring SubtaskCommittableManager, 
> the subtask id is replaced with the one in the current operator. This means 
> that the id originally is the id of the sender task (0 ~ N - 1), but after 
> restoring it has to be 0. This would cause Duplication Key exception during 
> restoring.
>  # For Committer operator, the subtaskId of CheckpointCommittableManagerImpl 
> is always restored to 0 after failover for all the subtasks. This makes the 
> summary sent to the Global Committer is attached with wrong subtask id.
>  # For Committer operator, the checkpoint id of SubtaskCommittableManager is 
> always restored to 1 after failover, this make the following committable sent 
> to the global committer is attached with wrong checkpoint id. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)