Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-24 Thread Yun Gao
   Hi all,

 I tested the previous PoC with the current tests and I found some new 
issues that might cause divergence, and sorry for there might also be some 
reversal for some previous problems:

 1. Which operators should wait for one more checkpoint before close ?

One motivation for this FLIP is to ensure the 2PC sink commits the last 
part of data before closed, which makes the sink operator need to wait for one 
more checkpoint like onEndOfInput() -> waitForCheckpoint() -> 
notifyCheckpointComplete() -> close(). This lead to the issue which operators 
should wait for checkpoint? Possible options are 
 a. Make all the operators (or UDF) implemented 
notifyCheckpointCompleted method wait for one more checkpoint. One exception is 
that since we can only snapshot one or all tasks for a legacy source operator 
to avoid data repetition[1], we could not support legacy operators and its 
chained operators to wait for checkpoints since there will be deadlock if part 
of the tasks are finished, this would finally be solved after legacy source are 
deprecated. The PoC used this option for now.
b. Make operators (or UDF) implemented a special marker 
interface to wait for one more checkpoint.  


   2. Do we need to solve the case that tasks finished before triggered ?

  Previously I think we could postpone it, however, during testing I found 
that it might cause some problems since by default checkpoint failure would 
cause job failover, and the job would also need wait for another interval to 
trigger the next checkpoint. To pass the tests, I updated the PoC to include 
this part, and we may have a double think on if we need to include it or use 
some other options.

3. How to extend a new format for checkpoint meta ?

Sorry previously I gave a wrong estimation, after I extract a sub-component 
for (de)serialize operator state, I found the problem just goes to the new 
OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have 
different fields, thus they use different process when (de)serialize, which is 
a bit different from the case that we have a fixed steps and each step has 
different logic. Thus we might either
 a. Use base classes for each two version.
 b. Or have a unified framework contains all the possible fields across all 
version, and use empty field serializer to skip some fields in each version.

Best,
Yun

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish


--
From:Yun Gao 
Send Time:2020 Dec. 16 (Wed.) 11:07
To:Aljoscha Krettek ; dev ; user 

Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

 Hi Aljoscha,

Very thanks for the feedbacks! For the remaining issues:

  > 1. You mean we would insert "artificial" barriers for barrier 2 in case 
we receive  EndOfPartition while other inputs have already received barrier 2? 
I think that makes sense, yes.

  Yes, exactly, I would like to  insert "artificial" barriers for in case 
we receive  EndOfPartition while other inputs have already received barrier 2, 
and also for the similar cases that some input channels received EndOfPartition 
during checkpoint 2 is ongoing and when the task receive directly checkpoint 
triggering after all the precedent tasks are finished but not received their 
EndOfPartition yet.

 > 3. This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?

I re-checked the code and now I think composition would be better to avoid 
complex inheritance hierarchy by exposing the changed part 
`(de)serializeOperatorState` out, and I'll update the PoC to change this part. 
Very thanks for the suggestions!

   > 4. Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. Are you 
saying we should fix that or would the new work introduce even 
more duplicate code?

  Currently since we would never trigger non-source tasks, thus the 
triggerCheckpoint logic is now implemented in the base StreamTask class and 
only be used by the source tasks. However, after the change the non-source 
tasks would also get triggered with a different behavior, we might not be able 
to continue using this pattern.

Best,
Yun


--
From:Aljoscha Krettek 
Send Time:2020 Dec. 15 (Tue.) 18:11
To:dev 
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>  1. To include EndOfPartition into consideration for barrier alignment at 
> the TM side, we now tend to decouple the logic for EndOfPartition with the 
> normal 

[jira] [Created] (FLINK-20767) add nested field support for SupportsFilterPushDown

2020-12-24 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-20767:
-

 Summary: add nested field support for SupportsFilterPushDown
 Key: FLINK-20767
 URL: https://issues.apache.org/jira/browse/FLINK-20767
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.12.0
Reporter: Jun Zhang
 Fix For: 1.13.0


I think we should add the nested field support for SupportsFilterPushDown



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20766) Separate the implementation of stream sort nodes

2020-12-24 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-20766:
---

 Summary: Separate the implementation of stream sort nodes
 Key: FLINK-20766
 URL: https://issues.apache.org/jira/browse/FLINK-20766
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Wenlong Lyu
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20765) ScalarOperatorGens doesn't set proper nullability for result type of generated expressions

2020-12-24 Thread Rui Li (Jira)
Rui Li created FLINK-20765:
--

 Summary: ScalarOperatorGens doesn't set proper nullability for 
result type of generated expressions
 Key: FLINK-20765
 URL: https://issues.apache.org/jira/browse/FLINK-20765
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Rui Li






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Insufficient number of network buffers after restarting

2020-12-24 Thread Yangze Guo
Hi, Yufei.

Can you reproduce this issue in 1.10.0? The deterministic slot sharing
introduced in 1.12.0 is one possible reason. Before 1.12.0, the
distribution of tasks in slots is not determined. Even if the network
buffers are enough from the perspective of the cluster. Bad
distribution of tasks can lead to the "insufficient network buffer" as
well.

Best,
Yangze Guo

On Fri, Dec 25, 2020 at 12:54 AM Yufei Liu  wrote:
>
> Hey,
> I’ve found that job will throw “java.io.IOException: Insufficient number of 
> network buffers: required 51, but only 1 available” after job retstart, and 
> I’ve observed TM use much more network buffers than before.
> My internal branch is under 1.10.0 can easily  reproduce, but I use 1.12.0 
> doesn’t have this issue. I Think maybe was already fixed after some PR, I'm 
> curious about what can lead to this problem?
>
> Best.
> YuFei.


Insufficient number of network buffers after restarting

2020-12-24 Thread Yufei Liu
Hey,
I’ve found that job will throw “java.io.IOException: Insufficient number of 
network buffers: required 51, but only 1 available” after job retstart, and 
I’ve observed TM use much more network buffers than before.
My internal branch is under 1.10.0 can easily  reproduce, but I use 1.12.0 
doesn’t have this issue. I Think maybe was already fixed after some PR, I'm 
curious about what can lead to this problem?

Best.
YuFei.

[jira] [Created] (FLINK-20763) canal format parse update record with null value get wrong result

2020-12-24 Thread WangRuoQi (Jira)
WangRuoQi created FLINK-20763:
-

 Summary: canal format parse update record with null value get 
wrong result
 Key: FLINK-20763
 URL: https://issues.apache.org/jira/browse/FLINK-20763
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.11.2
Reporter: WangRuoQi
 Attachments: canal_format.patch

When i use canal format to consume mysql binlog like this:
{code:java}
select ymd,count(order_no),count(*) from order_table where status>=3 group by 
ymd;{code}
I get result like this:
{code:java}
(20201212,10,10)
..
(20201212,20,24)
..
(20201212,100,130)
..{code}
I am ensure than when status>=3, every record has a valid order no, and i got a 
result with dirrent count(order_no) and count(*).

I found this on debugging.
{code:java}
insert into order_table(ymd,order_no,status) values(20201212,null,1);
-- +I(20201212,null,1)
update table order_table set order_no=123,status=3 where id=1;
-- -U(20201212,123,1)
-- +U(20201212,123,3){code}
So i notice that the canal format meet bug when parse update record.

 

The source code logic is
{code:java}
} else if (OP_UPDATE.equals(type)) {
   // "data" field is an array of row, contains new rows
   ArrayData data = row.getArray(0);
   // "old" field is an array of row, contains old values
   ArrayData old = row.getArray(1);
   for (int i = 0; i < data.size(); i++) {
  // the underlying JSON deserialization schema always produce 
GenericRowData.
  GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
  GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
  for (int f = 0; f < fieldCount; f++) {
 if (before.isNullAt(f)) {
// not null fields in "old" (before) means the fields are changed
// null/empty fields in "old" (before) means the fields are not 
changed
// so we just copy the not changed fields into before
before.setField(f, after.getField(f));
 }
  }
  before.setRowKind(RowKind.UPDATE_BEFORE);
  after.setRowKind(RowKind.UPDATE_AFTER);
  out.collect(before);
  out.collect(after);
{code}
When the old field has null value, it will be overwrite by the new record 
value. That lead the aggregation to a wrong result.

 

I tried to fix this bug with following logic.

For each field. Use old value when old row has this field whether it is null or 
nut, Use new value by default.

I hope this bug will be fixed on the future version.

[^canal_format.patch]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20762) Remove unused checkpointStorage path after job finished

2020-12-24 Thread Liu (Jira)
Liu created FLINK-20762:
---

 Summary: Remove unused checkpointStorage path after job finished
 Key: FLINK-20762
 URL: https://issues.apache.org/jira/browse/FLINK-20762
 Project: Flink
  Issue Type: Improvement
Reporter: Liu


    Current checkpoint structure is as following:
{code:java}
/user-defined-checkpoint-dir 
| 
+ --shared/ 
+ --taskowned/ 
+ --chk-1/ 
+ --chk-2/...
{code}
    After cancelling job, the checkpointStorage will not be removed.  For some 
cases, the dictionary can not removed. For example, external checkpoint is 
retain or taskowned is used. 

    If all the sub-dictionaries are empty, maybe the checkpointStorage can be 
removed. Otherwise, there will be lots of unused dictionaries with time going 
on. I wonder whether this is a problem. If so, I would like to fix it. Thank 
you.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20761) Cannot read hive table/partition whose location path contains comma

2020-12-24 Thread Rui Li (Jira)
Rui Li created FLINK-20761:
--

 Summary: Cannot read hive table/partition whose location path 
contains comma
 Key: FLINK-20761
 URL: https://issues.apache.org/jira/browse/FLINK-20761
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Reporter: Rui Li






--
This message was sent by Atlassian Jira
(v8.3.4#803005)