Re: How to ensure that job is restored from savepoint when using Flink SQL

2020-07-08 Thread shadowell
Hi Fabian,




Thanks for your reply, it helps a lot.




Best Regards,

Jie



| |
shadowell
|
|
shadow...@126.com
|
签名由网易邮箱大师定制
On 7/8/2020 18:17,Fabian Hueske wrote:
Hi Jie,


The auto-ID generation is not done by the SQL translation component but on a 
lower level, i.e., it's independent of Flink's SQL translation.

The ID generation only depends on the topology / graph structure of the 
program's operators.

The ID of an operator depends on the IDs of its predecessors (and not on its 
own processing logic or operator name).


So, as long as the operator graph structure of a program remains the same, it 
will be compatible with an earlier savepoint.
However, preserving the operator graph structure is out of the user's control.

The operator graph is automatically generated by the SQL optimizer and slight 
changes of a query can result in a different graph while other changes do not 
affect the structure.


In your example, the graph structure should remain the same because there is 
already a Filter operator (due to "where id == '001'") in the first query and 
the second query just extends the filter predicate ("id == '001' and age >= 
'28'").
If there was no WHERE clause in the first query, the plan might have been 
changed.
In order to reason about which query changes are savepoint compatible, you need 
in-depth knowledge about the optimizer's translation process.


I would not rely on being able to start a query from a savepoint of a 
(slightly) modified query.

First because it is very fragile given the query translation process and second 
because it results in incorrect results.


Given your example query, I would start it from scratch and add a predicate to 
continue after the latest result of the previous query:


select id, name, sum(salary) from user_info where id == '001' and age >= '28' 
and rowtime >= 'xxx' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;



If the last result of the first query was for '2020-07-07' I would set xxx to 
'2020-07-08-00:00:00.000'.
Of course this only works for queries with hard temporary boundaries, but it 
gives correct results.


Best, Fabian


Am Mi., 8. Juli 2020 um 04:50 Uhr schrieb shadowell :



Hi Fabian,


Thanks for your information!
Actually, I am not clear about the mechanism of auto-generated IDs in Flink SQL 
and the mechanism of how does the operator state mapping back from savepoint.
I hope to get some detail information by giving an example bellow.


I have two sql as samples:
old sql : select id, name, sum(salary) from user_info where id == '001' group 
by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;
new sql:   select id, name, sum(salary) from user_info where id == '001' and 
age >= '28' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name; 
I just add some age limitation in new SQL. Now, I want to switch the job from 
old one to the new one by trigger a savepoint. Flink will generate operator IDs 
for operators in new SQL.
In this case, just from a technical point of view,  the operator IDs in the 
savepoint of the old SQL job can match the operator IDs in the new SQL job?
My understanding is that Flink will reorder the operators and generate new IDs 
for operators. The new IDs may not match the old IDs. 
This will cause some states failed to be mapped back from the old job 
savepoint, which naturally leads to inaccurate calculation results.
I wonder if my understanding is correct.


Thanks~ 
Jie


| |
shadowell
|
|
shadow...@126.com
|
签名由网易邮箱大师定制
On 7/7/2020 17:23,Fabian Hueske wrote:
Hi Jie Feng,


As you said, Flink translates SQL queries into streaming programs with 
auto-generated operator IDs.
In order to start a SQL query from a savepoint, the operator IDs in the 
savepoint must match the IDs in the newly translated program.
Right now this can only be guaranteed if you translate the same query with the 
same Flink version (optimizer changes might change the structure of the 
resulting plan even if the query is the same).
This is of course a significant limitation, that the community is aware of and 
planning to improve in the future.


I'd also like to add that it can be very difficult to assess whether it is 
meaningful to start a query from a savepoint that was generated with a 
different query.
A savepoint holds intermediate data that is needed to compute the result of a 
query.
If you update a query it is very well possible that the result computed by 
Flink won't be equal to the actual result of the new query.



Best, Fabian



Am Mo., 6. Juli 2020 um 10:50 Uhr schrieb shadowell :



Hello, everyone,
I have some unclear points when using Flink SQL. I hope to get an 
answer or tell me where I can find the answer.
When using the DataStream API, in order to ensure that the job can 
recover the state from savepoint after adjustment, it is necessary to specify 
the uid for the operator. However, when using Flink SQL, the uid of the 
operator is automatically generated. If

Re: How to ensure that job is restored from savepoint when using Flink SQL

2020-07-07 Thread shadowell


Hi Fabian,


Thanks for your information!
Actually, I am not clear about the mechanism of auto-generated IDs in Flink SQL 
and the mechanism of how does the operator state mapping back from savepoint.
I hope to get some detail information by giving an example bellow.


I have two sql as samples:
old sql : select id, name, sum(salary) from user_info where id == '001' group 
by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;
new sql:   select id, name, sum(salary) from user_info where id == '001' and 
age >= '28' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name; 
I just add some age limitation in new SQL. Now, I want to switch the job from 
old one to the new one by trigger a savepoint. Flink will generate operator IDs 
for operators in new SQL.
In this case, just from a technical point of view,  the operator IDs in the 
savepoint of the old SQL job can match the operator IDs in the new SQL job?
My understanding is that Flink will reorder the operators and generate new IDs 
for operators. The new IDs may not match the old IDs. 
This will cause some states failed to be mapped back from the old job 
savepoint, which naturally leads to inaccurate calculation results.
I wonder if my understanding is correct.


Thanks~ 
Jie


| |
shadowell
|
|
shadow...@126.com
|
签名由网易邮箱大师定制
On 7/7/2020 17:23,Fabian Hueske wrote:
Hi Jie Feng,


As you said, Flink translates SQL queries into streaming programs with 
auto-generated operator IDs.
In order to start a SQL query from a savepoint, the operator IDs in the 
savepoint must match the IDs in the newly translated program.
Right now this can only be guaranteed if you translate the same query with the 
same Flink version (optimizer changes might change the structure of the 
resulting plan even if the query is the same).
This is of course a significant limitation, that the community is aware of and 
planning to improve in the future.


I'd also like to add that it can be very difficult to assess whether it is 
meaningful to start a query from a savepoint that was generated with a 
different query.
A savepoint holds intermediate data that is needed to compute the result of a 
query.
If you update a query it is very well possible that the result computed by 
Flink won't be equal to the actual result of the new query.



Best, Fabian



Am Mo., 6. Juli 2020 um 10:50 Uhr schrieb shadowell :



Hello, everyone,
I have some unclear points when using Flink SQL. I hope to get an 
answer or tell me where I can find the answer.
When using the DataStream API, in order to ensure that the job can 
recover the state from savepoint after adjustment, it is necessary to specify 
the uid for the operator. However, when using Flink SQL, the uid of the 
operator is automatically generated. If the SQL logic changes (operator order 
changes), when the task is restored from savepoint, will it cause some of the 
operator states to be unable to be mapped back, resulting in state loss?


Thanks~
Jie Feng 
| |
shadowell
|
|
shadow...@126.com
|
签名由网易邮箱大师定制

How to ensure that job is restored from savepoint when using Flink SQL

2020-07-06 Thread shadowell


Hello, everyone,
I have some unclear points when using Flink SQL. I hope to get an 
answer or tell me where I can find the answer.
When using the DataStream API, in order to ensure that the job can 
recover the state from savepoint after adjustment, it is necessary to specify 
the uid for the operator. However, when using Flink SQL, the uid of the 
operator is automatically generated. If the SQL logic changes (operator order 
changes), when the task is restored from savepoint, will it cause some of the 
operator states to be unable to be mapped back, resulting in state loss?


Thanks~
Jie Feng 
| |
shadowell
|
|
shadow...@126.com
|
签名由网易邮箱大师定制

Flink SQL join usecase

2020-05-12 Thread shadowell
Hi,


I am new to Flink SQL, I want to know whether Flink SQL(Flink-1.10) supports 
the following join syntax:


 ``` 
   select a.id, a.col_1, b.col_1, c.col_1 from topic_a a 
 inner join topic_b b on a.id = b.id 
 left join topic_c c on a.id = c.id and a.col_1 = c.col_1 and b.col_1 = 
c.col_1;
 ```


Best Regards,
Jie Feng


| |
Jie Feng
|
|
shadow...@126.com
|
签名由网易邮箱大师定制