Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi all, I tested the previous PoC with the current tests and I found some new issues that might cause divergence, and sorry for there might also be some reversal for some previous problems: 1. Which operators should wait for one more checkpoint before close ? One motivation for this FLIP is to ensure the 2PC sink commits the last part of data before closed, which makes the sink operator need to wait for one more checkpoint like onEndOfInput() -> waitForCheckpoint() -> notifyCheckpointComplete() -> close(). This lead to the issue which operators should wait for checkpoint? Possible options are a. Make all the operators (or UDF) implemented notifyCheckpointCompleted method wait for one more checkpoint. One exception is that since we can only snapshot one or all tasks for a legacy source operator to avoid data repetition[1], we could not support legacy operators and its chained operators to wait for checkpoints since there will be deadlock if part of the tasks are finished, this would finally be solved after legacy source are deprecated. The PoC used this option for now. b. Make operators (or UDF) implemented a special marker interface to wait for one more checkpoint. 2. Do we need to solve the case that tasks finished before triggered ? Previously I think we could postpone it, however, during testing I found that it might cause some problems since by default checkpoint failure would cause job failover, and the job would also need wait for another interval to trigger the next checkpoint. To pass the tests, I updated the PoC to include this part, and we may have a double think on if we need to include it or use some other options. 3. How to extend a new format for checkpoint meta ? Sorry previously I gave a wrong estimation, after I extract a sub-component for (de)serialize operator state, I found the problem just goes to the new OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have different fields, thus they use different process when (de)serialize, which is a bit different from the case that we have a fixed steps and each step has different logic. Thus we might either a. Use base classes for each two version. b. Or have a unified framework contains all the possible fields across all version, and use empty field serializer to skip some fields in each version. Best, Yun [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish -- From:Yun Gao Send Time:2020 Dec. 16 (Wed.) 11:07 To:Aljoscha Krettek ; dev ; user Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Aljoscha, Very thanks for the feedbacks! For the remaining issues: > 1. You mean we would insert "artificial" barriers for barrier 2 in case we receive EndOfPartition while other inputs have already received barrier 2? I think that makes sense, yes. Yes, exactly, I would like to insert "artificial" barriers for in case we receive EndOfPartition while other inputs have already received barrier 2, and also for the similar cases that some input channels received EndOfPartition during checkpoint 2 is ongoing and when the task receive directly checkpoint triggering after all the precedent tasks are finished but not received their EndOfPartition yet. > 3. This indeed seems complex. Maybe we could switch to using composition instead of inheritance to make this more extensible? I re-checked the code and now I think composition would be better to avoid complex inheritance hierarchy by exposing the changed part `(de)serializeOperatorState` out, and I'll update the PoC to change this part. Very thanks for the suggestions! > 4. Don't we currently have the same problem? Even right now source tasks and non-source tasks behave differently when it comes to checkpoints. Are you saying we should fix that or would the new work introduce even more duplicate code? Currently since we would never trigger non-source tasks, thus the triggerCheckpoint logic is now implemented in the base StreamTask class and only be used by the source tasks. However, after the change the non-source tasks would also get triggered with a different behavior, we might not be able to continue using this pattern. Best, Yun -- From:Aljoscha Krettek Send Time:2020 Dec. 15 (Tue.) 18:11 To:dev Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Thanks for the thorough update! I'll answer inline. On 14.12.20 16:33, Yun Gao wrote: > 1. To include EndOfPartition into consideration for barrier alignment at > the TM side, we now tend to decouple the logic for EndOfPartition with the > normal
[jira] [Created] (FLINK-20767) add nested field support for SupportsFilterPushDown
Jun Zhang created FLINK-20767: - Summary: add nested field support for SupportsFilterPushDown Key: FLINK-20767 URL: https://issues.apache.org/jira/browse/FLINK-20767 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.12.0 Reporter: Jun Zhang Fix For: 1.13.0 I think we should add the nested field support for SupportsFilterPushDown -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20766) Separate the implementation of stream sort nodes
Wenlong Lyu created FLINK-20766: --- Summary: Separate the implementation of stream sort nodes Key: FLINK-20766 URL: https://issues.apache.org/jira/browse/FLINK-20766 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Wenlong Lyu Fix For: 1.13.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20765) ScalarOperatorGens doesn't set proper nullability for result type of generated expressions
Rui Li created FLINK-20765: -- Summary: ScalarOperatorGens doesn't set proper nullability for result type of generated expressions Key: FLINK-20765 URL: https://issues.apache.org/jira/browse/FLINK-20765 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Rui Li -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Insufficient number of network buffers after restarting
Hi, Yufei. Can you reproduce this issue in 1.10.0? The deterministic slot sharing introduced in 1.12.0 is one possible reason. Before 1.12.0, the distribution of tasks in slots is not determined. Even if the network buffers are enough from the perspective of the cluster. Bad distribution of tasks can lead to the "insufficient network buffer" as well. Best, Yangze Guo On Fri, Dec 25, 2020 at 12:54 AM Yufei Liu wrote: > > Hey, > I’ve found that job will throw “java.io.IOException: Insufficient number of > network buffers: required 51, but only 1 available” after job retstart, and > I’ve observed TM use much more network buffers than before. > My internal branch is under 1.10.0 can easily reproduce, but I use 1.12.0 > doesn’t have this issue. I Think maybe was already fixed after some PR, I'm > curious about what can lead to this problem? > > Best. > YuFei.
Insufficient number of network buffers after restarting
Hey, I’ve found that job will throw “java.io.IOException: Insufficient number of network buffers: required 51, but only 1 available” after job retstart, and I’ve observed TM use much more network buffers than before. My internal branch is under 1.10.0 can easily reproduce, but I use 1.12.0 doesn’t have this issue. I Think maybe was already fixed after some PR, I'm curious about what can lead to this problem? Best. YuFei.
[jira] [Created] (FLINK-20763) canal format parse update record with null value get wrong result
WangRuoQi created FLINK-20763: - Summary: canal format parse update record with null value get wrong result Key: FLINK-20763 URL: https://issues.apache.org/jira/browse/FLINK-20763 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.11.2 Reporter: WangRuoQi Attachments: canal_format.patch When i use canal format to consume mysql binlog like this: {code:java} select ymd,count(order_no),count(*) from order_table where status>=3 group by ymd;{code} I get result like this: {code:java} (20201212,10,10) .. (20201212,20,24) .. (20201212,100,130) ..{code} I am ensure than when status>=3, every record has a valid order no, and i got a result with dirrent count(order_no) and count(*). I found this on debugging. {code:java} insert into order_table(ymd,order_no,status) values(20201212,null,1); -- +I(20201212,null,1) update table order_table set order_no=123,status=3 where id=1; -- -U(20201212,123,1) -- +U(20201212,123,3){code} So i notice that the canal format meet bug when parse update record. The source code logic is {code:java} } else if (OP_UPDATE.equals(type)) { // "data" field is an array of row, contains new rows ArrayData data = row.getArray(0); // "old" field is an array of row, contains old values ArrayData old = row.getArray(1); for (int i = 0; i < data.size(); i++) { // the underlying JSON deserialization schema always produce GenericRowData. GenericRowData after = (GenericRowData) data.getRow(i, fieldCount); GenericRowData before = (GenericRowData) old.getRow(i, fieldCount); for (int f = 0; f < fieldCount; f++) { if (before.isNullAt(f)) { // not null fields in "old" (before) means the fields are changed // null/empty fields in "old" (before) means the fields are not changed // so we just copy the not changed fields into before before.setField(f, after.getField(f)); } } before.setRowKind(RowKind.UPDATE_BEFORE); after.setRowKind(RowKind.UPDATE_AFTER); out.collect(before); out.collect(after); {code} When the old field has null value, it will be overwrite by the new record value. That lead the aggregation to a wrong result. I tried to fix this bug with following logic. For each field. Use old value when old row has this field whether it is null or nut, Use new value by default. I hope this bug will be fixed on the future version. [^canal_format.patch] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20762) Remove unused checkpointStorage path after job finished
Liu created FLINK-20762: --- Summary: Remove unused checkpointStorage path after job finished Key: FLINK-20762 URL: https://issues.apache.org/jira/browse/FLINK-20762 Project: Flink Issue Type: Improvement Reporter: Liu Current checkpoint structure is as following: {code:java} /user-defined-checkpoint-dir | + --shared/ + --taskowned/ + --chk-1/ + --chk-2/... {code} After cancelling job, the checkpointStorage will not be removed. For some cases, the dictionary can not removed. For example, external checkpoint is retain or taskowned is used. If all the sub-dictionaries are empty, maybe the checkpointStorage can be removed. Otherwise, there will be lots of unused dictionaries with time going on. I wonder whether this is a problem. If so, I would like to fix it. Thank you. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20761) Cannot read hive table/partition whose location path contains comma
Rui Li created FLINK-20761: -- Summary: Cannot read hive table/partition whose location path contains comma Key: FLINK-20761 URL: https://issues.apache.org/jira/browse/FLINK-20761 Project: Flink Issue Type: Bug Components: Connectors / Hive Reporter: Rui Li -- This message was sent by Atlassian Jira (v8.3.4#803005)