回复: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'

2020-05-17 文章 wind.fly....@outlook.com
是的,应该是AS PROCTIME(),是我的笔误。但是同样的错误还会报。

发件人: Benchao Li 
发送时间: 2020年5月18日 12:59
收件人: user-zh 
主题: Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR 
SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'

你第二次贴的DDL好像也有些问题,是不是`proctime AS PROCTIME(),`?

wind.fly@outlook.com  于2020年5月18日周一 上午9:48写道:

> Sorry, 之前建表语句copy错了,应该是这样:
> CREATE TABLE x.log.yanfa_log (
> dt TIMESTAMP(3),
> conn_id STRING,
> sequence STRING,
> trace_id STRING,
> span_info STRING,
> service_id STRING,
> msg_id STRING,
> servicename STRING,
> ret_code STRING,
> duration STRING,
> req_body MAP,
> res_body MAP,
> extra_info MAP,
> proctime PROCTIME(),
> WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = '0.11',
> 'connector.topic' = 'x-log-yanfa_log',
> 'connector.properties.bootstrap.servers' = '**:9092',
> 'connector.properties.zookeeper.connect' = '**:2181',
> 'connector.startup-mode' = 'latest-offset',
> 'update-mode' = 'append',
> 'format.type' = 'json',
> 'format.fail-on-missing-field' = 'true'
> );
> 报同样的错误
>
> -邮件原件-
> 发件人: wind.fly@outlook.com 
> 发送时间: 2020年5月18日 9:45
> 收件人: user-zh@flink.apache.org
> 主题: flink sql使用维表关联时报Temporal table join currently only supports 'FOR
> SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
>
> Hi, all:
>本人使用的flink 版本为1.10.0,planner为BlinkPlanner,用LEFT JOIN FOR
> SYSTEM_TIME AS OF 语法关联维表:
>select TUMBLE_END(l.dt, INTERVAL '30' SECOND) as index_time,
> l.extra_info['cityCode'] as city_code, v.vehicle_level as vehicle_level,
> CAST(COUNT(DISTINCT req_body['driverId']) as STRING) as index_value from
> x.log.yanfa_log AS l LEFT JOIN x.saic_auth_user.t_driver FOR SYSTEM_TIME AS
> OF l.proctime AS d ON l.req_body['driverId'] = d.uid LEFT JOIN
> x.saic_cms_config.t_vehicle FOR SYSTEM_TIME AS OF l.proctime AS v ON
> d.vin=v.vehicle_vin where l.ret_code = '0' and l.servicename =
> 'MatchGtw.uploadLocationV4' and l.req_body['appId'] = 'saic_card' GROUP BY
> TUMBLE(l.dt, INTERVAL '30' SECOND), l.extra_info['cityCode'],
> v.vehicle_level;
>建表语句用了computed columns:
>CREATE TABLE x.log.yanfa_log (
> dt TIMESTAMP(3),
> conn_id STRING,
> sequence STRING,
> trace_id STRING,
> span_info STRING,
> service_id STRING,
> msg_id STRING,
> servicename STRING,
> ret_code STRING,
> duration STRING,
> req_body MAP,
> res_body MAP,
> extra_info MAP,
> proctime TIMESTAMP(3),
> WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = '0.11',
> 'connector.topic' = 'x-log-yanfa_log',
> 'connector.properties.bootstrap.servers' = '**:9092',
> 'connector.properties.zookeeper.connect' = '**:2181',
> 'connector.startup-mode' = 'latest-offset',
> 'update-mode' = 'append',
> 'format.type' = 'json',
> 'format.fail-on-missing-field' = 'true'
> );
> 报如下异常:
>Caused by: org.apache.flink.table.api.TableException: Temporal
> table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's
> proctime field, doesn't support 'PROCTIME()'
>at
> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:118)
>at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:131)
>at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>at
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>at
> 

Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'

2020-05-17 文章 Benchao Li
你第二次贴的DDL好像也有些问题,是不是`proctime AS PROCTIME(),`?

wind.fly@outlook.com  于2020年5月18日周一 上午9:48写道:

> Sorry, 之前建表语句copy错了,应该是这样:
> CREATE TABLE x.log.yanfa_log (
> dt TIMESTAMP(3),
> conn_id STRING,
> sequence STRING,
> trace_id STRING,
> span_info STRING,
> service_id STRING,
> msg_id STRING,
> servicename STRING,
> ret_code STRING,
> duration STRING,
> req_body MAP,
> res_body MAP,
> extra_info MAP,
> proctime PROCTIME(),
> WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = '0.11',
> 'connector.topic' = 'x-log-yanfa_log',
> 'connector.properties.bootstrap.servers' = '**:9092',
> 'connector.properties.zookeeper.connect' = '**:2181',
> 'connector.startup-mode' = 'latest-offset',
> 'update-mode' = 'append',
> 'format.type' = 'json',
> 'format.fail-on-missing-field' = 'true'
> );
> 报同样的错误
>
> -邮件原件-
> 发件人: wind.fly@outlook.com 
> 发送时间: 2020年5月18日 9:45
> 收件人: user-zh@flink.apache.org
> 主题: flink sql使用维表关联时报Temporal table join currently only supports 'FOR
> SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'
>
> Hi, all:
>本人使用的flink 版本为1.10.0,planner为BlinkPlanner,用LEFT JOIN FOR
> SYSTEM_TIME AS OF 语法关联维表:
>select TUMBLE_END(l.dt, INTERVAL '30' SECOND) as index_time,
> l.extra_info['cityCode'] as city_code, v.vehicle_level as vehicle_level,
> CAST(COUNT(DISTINCT req_body['driverId']) as STRING) as index_value from
> x.log.yanfa_log AS l LEFT JOIN x.saic_auth_user.t_driver FOR SYSTEM_TIME AS
> OF l.proctime AS d ON l.req_body['driverId'] = d.uid LEFT JOIN
> x.saic_cms_config.t_vehicle FOR SYSTEM_TIME AS OF l.proctime AS v ON
> d.vin=v.vehicle_vin where l.ret_code = '0' and l.servicename =
> 'MatchGtw.uploadLocationV4' and l.req_body['appId'] = 'saic_card' GROUP BY
> TUMBLE(l.dt, INTERVAL '30' SECOND), l.extra_info['cityCode'],
> v.vehicle_level;
>建表语句用了computed columns:
>CREATE TABLE x.log.yanfa_log (
> dt TIMESTAMP(3),
> conn_id STRING,
> sequence STRING,
> trace_id STRING,
> span_info STRING,
> service_id STRING,
> msg_id STRING,
> servicename STRING,
> ret_code STRING,
> duration STRING,
> req_body MAP,
> res_body MAP,
> extra_info MAP,
> proctime TIMESTAMP(3),
> WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = '0.11',
> 'connector.topic' = 'x-log-yanfa_log',
> 'connector.properties.bootstrap.servers' = '**:9092',
> 'connector.properties.zookeeper.connect' = '**:2181',
> 'connector.startup-mode' = 'latest-offset',
> 'update-mode' = 'append',
> 'format.type' = 'json',
> 'format.fail-on-missing-field' = 'true'
> );
> 报如下异常:
>Caused by: org.apache.flink.table.api.TableException: Temporal
> table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's
> proctime field, doesn't support 'PROCTIME()'
>at
> org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
>at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:118)
>at
> org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:131)
>at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
>at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
>at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
>at
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
>at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
>at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
>at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
>at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
>at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
>at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
>at
> 

Re: [ANNOUNCE] Apache Flink 1.10.1 released

2020-05-17 文章 Leonard Xu

Thanks Yu for being the release manager, and everyone else who made this 
possible.

Best,
Leonard Xu

> 在 2020年5月18日,10:43,Zhu Zhu  写道:
> 
> Thanks Yu for being the release manager. Thanks everyone who made this 
> release possible!
> 
> Thanks,
> Zhu Zhu
> 
> Benchao Li mailto:libenc...@gmail.com>> 于2020年5月15日周五 
> 下午7:51写道:
> Thanks Yu for the great work, and everyone else who made this possible.
> 
> Dian Fu mailto:dian0511...@gmail.com>> 于2020年5月15日周五 
> 下午6:55写道:
> Thanks Yu for managing this release and everyone else who made this release 
> possible. Good work!
> 
> Regards,
> Dian
> 
>> 在 2020年5月15日,下午6:26,Till Rohrmann > > 写道:
>> 
>> Thanks Yu for being our release manager and everyone else who made the 
>> release possible!
>> 
>> Cheers,
>> Till
>> 
>> On Fri, May 15, 2020 at 9:15 AM Congxian Qiu > > wrote:
>> Thanks a lot for the release and your great job, Yu!
>> Also thanks to everyone who made this release possible!
>> 
>> Best,
>> Congxian
>> 
>> 
>> Yu Li mailto:car...@gmail.com>> 于2020年5月14日周四 上午1:59写道:
>> The Apache Flink community is very happy to announce the release of Apache 
>> Flink 1.10.1, which is the first bugfix release for the Apache Flink 1.10 
>> series.
>> 
>> Apache Flink® is an open-source stream processing framework for distributed, 
>> high-performing, always-available, and accurate data streaming applications.
>> 
>> The release is available for download at:
>> https://flink.apache.org/downloads.html 
>> 
>> 
>> Please check out the release blog post for an overview of the improvements 
>> for this bugfix release:
>> https://flink.apache.org/news/2020/05/12/release-1.10.1.html 
>> 
>> 
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891
>>  
>> 
>> 
>> We would like to thank all contributors of the Apache Flink community who 
>> made this release possible!
>> 
>> Regards,
>> Yu
> 
> 
> 
> -- 
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com ; libenc...@pku.edu.cn 
> 


Re: [ANNOUNCE] Apache Flink 1.10.1 released

2020-05-17 文章 Zhu Zhu
Thanks Yu for being the release manager. Thanks everyone who made this
release possible!

Thanks,
Zhu Zhu

Benchao Li  于2020年5月15日周五 下午7:51写道:

> Thanks Yu for the great work, and everyone else who made this possible.
>
> Dian Fu  于2020年5月15日周五 下午6:55写道:
>
>> Thanks Yu for managing this release and everyone else who made this
>> release possible. Good work!
>>
>> Regards,
>> Dian
>>
>> 在 2020年5月15日,下午6:26,Till Rohrmann  写道:
>>
>> Thanks Yu for being our release manager and everyone else who made the
>> release possible!
>>
>> Cheers,
>> Till
>>
>> On Fri, May 15, 2020 at 9:15 AM Congxian Qiu 
>> wrote:
>>
>>> Thanks a lot for the release and your great job, Yu!
>>> Also thanks to everyone who made this release possible!
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Yu Li  于2020年5月14日周四 上午1:59写道:
>>>
 The Apache Flink community is very happy to announce the release of
 Apache Flink 1.10.1, which is the first bugfix release for the Apache Flink
 1.10 series.

 Apache Flink® is an open-source stream processing framework for
 distributed, high-performing, always-available, and accurate data streaming
 applications.

 The release is available for download at:
 https://flink.apache.org/downloads.html

 Please check out the release blog post for an overview of the
 improvements for this bugfix release:
 https://flink.apache.org/news/2020/05/12/release-1.10.1.html

 The full release notes are available in Jira:

 https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891

 We would like to thank all contributors of the Apache Flink community
 who made this release possible!

 Regards,
 Yu

>>>
>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


?????? ?????? ????sql????????????idle??????????????????????????????

2020-05-17 文章 claylin
1.10??kafka??2-5M/s




----
??:"LakeShen"https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time

Best,
LakeShen



claylin <1012539...@qq.com ??2020??5??17?? 10:24??

 
??10-15??key??kafka2-5M


 --nbsp;nbsp;--
 ??:nbsp;"??"https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4
 gt
 


run flink on edge vs hub

2020-05-17 文章 Eleanore Jin
Hi Community,

Currently we are running flink in 'hub' data centers where data is ingested
into the platform via kafka, and flink job will read from kafka, do the
transformations, and publish to another kafka topic.

I would also like to see if the same logic (read input message -> do
transformation -> return output message) can be applied on 'edge' data
centers.

The requirement for run on 'edge' is to return the response synchronously.
Like the synchronous http based request/response.

Can you please provide some guidance/thoughts on this?

Thanks a lot!
Eleanore


Re: save point容灾方案咨询

2020-05-17 文章 Congxian Qiu
你好

据我所知,这个在 Flink 不太好支持,你可以试试看存储层是否能够实现类似的需求,另外 FLINK-5763[1]
或许对你有用,FLINK-5763 能够更好地支持 savepoint 在集群间的迁移

[1] https://issues.apache.org/jira/browse/FLINK-5763
Best,
Congxian


1048262223 <1048262...@qq.com> 于2020年5月17日周日 下午9:17写道:

> +1,如果主备都在flink内的话,可能会加倍做checkpoint的负载,个人理解直接在状态后端内部做主备集群同步效率会更高
>
>
>
>
> -- 原始邮件 --
> 发件人: tison  发送时间: 2020年5月17日 20:50
> 收件人: user-zh  主题: 回复:save point容灾方案咨询
>
>
>
> 这个我理解不在 Flink 的范畴里啊。你 savepoint 存到一个位置,然后外部挂一个同步器在主集群和容灾集群里同步(savepoint
> 目录)就可以吧。
>
> Best,
> tison.
>
>
> zhisheng 
>  hi
> 
>  如果做 Checkpoint 或者 Savepoint 的时候可以填两个 HDFS 集群的地址路径(一个是你的主集群/另一个是容灾集群)
>  是不是就可以解决你现在的问题,达到你想要的需求?
> 
>  Best
> 
>  zhisheng
> 
>  请叫我雷锋 <854194...@qq.com 于2020年5月17日周日 下午7:32写道:
> 
>   谢谢关注:
>  
>  
>   savepoint 容灾
> 是指的,每次执行savepoint生成的文件,能够在容灾集群上做备份。当主集群变得不可用时,可以将任务迁移到容灾
>   集群进行根据savepoint 进行任务恢复。
>  
>  
>   --nbsp;原始邮件nbsp;--
>   发件人:nbsp;"Congxian Qiu"   发送时间:nbsp;2020年5月17日(星期天) 晚上6:01
>   收件人:nbsp;"user-zh"  
>   主题:nbsp;Re: save point容灾方案咨询
>  
>  
>  
>   你好
>  
>   请问这里的 savepoint 容灾的 “容灾” 具体是指什么呢?希望解决什么问题呢?
>  
>   Best,
>   Congxian
>  
>  
>   LakeShen  上午10:20写道:
>  
>   gt; Hi ,
>   gt;
>   gt; 你可以把你的场景在描述的详细一些。
>   gt;
>   gt; Best,
>   gt; LakeShen
>   gt;
>   gt; 请叫我雷锋 <854194...@qq.comgt; 于2020年5月14日周四 下午9:42写道:
>   gt;
>   gt; gt; 各位大佬好,请问有啥好的save point容灾方案嘛?
>   gt; gt;
>   gt; gt;
>   gt; gt;
>   gt; gt; 发自我的iPhone
>   gt;
> 


Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

2020-05-17 文章 LakeShen
Hi,

你可以描述一下你的 Flink 版本,具体空闲状态保留时间的含义,请参考一下[1]:

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time

Best,
LakeShen



claylin <1012539...@qq.com> 于2020年5月17日周日 下午10:24写道:

> 过期时间是10-15分钟,按理说我是按照每分钟作为key分组的,应该很快就会过期,kafka数据流量的话每秒2-5M
>
>
> --原始邮件--
> 发件人:"刘大龙" 发送时间:2020年5月17日(星期天) 晚上10:14
> 收件人:"user-zh"
> 主题:Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
>
>
>
> Hi,
>  你的状态过期时间设置的是多久?对于普通的group by
> agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大
>
>
>  -原始邮件-
>  发件人: claylin <1012539...@qq.com
>  发送时间: 2020-05-17 17:41:13 (星期日)
>  收件人: user-zh   抄送:
>  主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
> 
>  链接这里nbsp;
> https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4
> 
> ;
>
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"tison"  发送时间:nbsp;2020年5月17日(星期天) 下午5:34
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
> 
> 
> 
>  考虑把 SQL 贴成 gist 链接?
> 
>  Best,
>  tison.
> 
> 
>  claylin <1012539...@qq.comgt; 于2020年5月17日周日 下午5:32写道:
> 
>  gt; sql作业定义如下,也通过TableConfig设置了最大和最小idle
>  gt;
> time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE
>  gt; TABLE yy_yapmnetwork_original
> (nbsp;nbsp;nbsp;nbsp; happenAt
> BIGINT,nbsp;nbsp;nbsp;nbsp; uid BIGINT,
>  gt;nbsp; appId
> STRING,nbsp;nbsp;nbsp;nbsp; deviceId
> STRING,nbsp;nbsp;nbsp;nbsp; appVer
> STRING,nbsp;nbsp;nbsp;nbsp; dnsDur BIGINT,
>  gt;nbsp;nbsp;nbsp; useGlb
> INT,nbsp;nbsp;nbsp;nbsp; hitCache
> INT,nbsp;nbsp;nbsp;nbsp; requestSize
> DOUBLE,nbsp;nbsp;nbsp;nbsp; responseSize
>  gt; DOUBLE,nbsp;nbsp;nbsp;nbsp; totalDur
> BIGINT,nbsp;nbsp;nbsp;nbsp; url
> STRING,nbsp;nbsp;nbsp;nbsp; statusCode INT,
>  gt;nbsp; prototype
> STRING,nbsp;nbsp;nbsp;nbsp; netType
> STRING,nbsp;nbsp;nbsp;nbsp; traceId
> STRING,nbsp;nbsp;nbsp;nbsp; ts AS
>  gt; CAST(FROM_UNIXTIME(happenAt/1000) AS
> TIMESTAMP(3)),nbsp;nbsp;nbsp;nbsp; WATERMARK FOR ts AS
>  gt; ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka',
>  gt; 'connector.version' = 'universal', 'connector.topic' =
> 'yapm_metrics',
>  gt; 'connector.properties.zookeeper.connect' = 'localhost:2181',
>  gt; 'connector.properties.bootstrap.servers' = '
> kafkawx007-core001.yy.com:8101
>  gt; ,kafkawx007-core002.yy.com:8101,
> kafkawx007-core003.yy.com:8101', '
>  gt; connector.properties.group.id' =
> 'interface_success_rate_consumer',
>  gt; 'connector.startup-mode' = 'latest-offset', 'format.type' =
> 'json' );
>  gt; create table request_latency_tbl
> (nbsp;nbsp;nbsp;nbsp; app_id
> string,nbsp;nbsp;nbsp;nbsp; app_ver string,
>  gt;nbsp;nbsp;nbsp; net_type
> string,nbsp;nbsp;nbsp;nbsp; prototype
> string,nbsp;nbsp;nbsp;nbsp; url
> string,nbsp;nbsp;nbsp;nbsp; status_code
>  gt; int,nbsp;nbsp;nbsp;nbsp; w_start
> string,nbsp;nbsp;nbsp;nbsp; success_cnt
> BIGINT,nbsp;nbsp;nbsp;nbsp; failure_cnt BIGINT,
>  gt;nbsp; total_cnt BIGINT ) with( 'connector.type' =
> 'jdbc', 'connector.url' =
>  gt;
> 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=trueamp;amp;characterEncoding=utf-8amp;amp;zeroDateTimeBehavior=convertToNullamp;amp;autoReconnect=true',
>  gt; 'connector.table' = 'request_latency_statistics',
> 'connector.username' =
>  gt; 'yapm_metrics', 'connector.password' = '1234456',
>  gt; 'connector.write.flush.max-rows' = '1000',
> 'connector.write.flush.interval'
>  gt; = '5s', 'connector.write.max-retries' = '2' ); create view
>  gt; request_1minutes_latencynbsp;
> asnbsp;nbsp;nbsp;nbsp; select appId, appVer, netType,
> prototype,
>  gt; url, statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm') w_start,
>  gt;nbsp; count(distinct traceId) filter (where statusCode
> in (200)) as successCnt,
>  gt;nbsp;nbsp;nbsp; count(distinct traceId) filter
> (where statusCode not in (200)) as
>  gt; failureCnt,nbsp;nbsp;nbsp;nbsp;
> count(distinct traceId) as
> total_cntnbsp;nbsp;nbsp;nbsp; from
>  gt; yy_yapmnetwork_original group by appId, appVer, netType,
> prototype, url,
>  gt; statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm'); insert into
>  gt; request_latency_tblnbsp;nbsp;nbsp;nbsp;
> select * fromnbsp; request_1minutes_latency;
>
>
> --
> 刘大龙
>
> 浙江大学 控制系 智能系统与控制研究所 工控新楼217
> 地址:浙江省杭州市浙大路38号浙江大学玉泉校区
> Tel:18867547281


回复: flink build-in 的 udf 的源码

2020-05-17 文章 venn
非常感谢大佬,耐心回复

-邮件原件-
发件人: user-zh-return-3567-wxchunjhyy=163@flink.apache.org 
 代表 Benchao Li
发送时间: 2020年5月16日 21:50
收件人: user-zh 
主题: Re: flink build-in 的 udf 的源码

Hi,

Flink内置函数的实现方式跟udf不太一样,很多函数是直接用的代码生成来做的。

下面是以blink planner为例,大概说下流程:
1. FlinkSqlOperatorTable 这个类里面放的是内置函数表,这个表会被calcite parse
SQL的时候用到,直接把这些函数识别为具体的某个函数定义。
2.
然后再代码生成阶段,会识别到这些函数,根据不同的函数定义,生成不同的函数实现调用。这部分你可以直接看下`org.apache.flink.table.planner.codegen.calls`这个package下的代码。
3. 上面第2条说的主要是scalar function的生成方式,agg
function还要特殊一点,这部分可以参考下`org.apache.flink.table.planner.functions.aggfunctions`这个package下的代码。


venn  于2020年5月16日周六 下午3:53写道:

> 各位大佬,请问下,flink 内置的 udf 的源码在什么位置,还有在哪里完成的函数注
> 册? 非常感谢各位大佬回复
>
>
>
> Thanks a lot !
>
>
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


回复: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'

2020-05-17 文章 wind.fly....@outlook.com
Sorry, 之前建表语句copy错了,应该是这样:
CREATE TABLE x.log.yanfa_log (
dt TIMESTAMP(3),
conn_id STRING,
sequence STRING,
trace_id STRING,
span_info STRING,
service_id STRING,
msg_id STRING,
servicename STRING,
ret_code STRING,
duration STRING,
req_body MAP, 
res_body MAP,
extra_info MAP,
proctime PROCTIME(),
WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'x-log-yanfa_log',
'connector.properties.bootstrap.servers' = '**:9092',
'connector.properties.zookeeper.connect' = '**:2181',
'connector.startup-mode' = 'latest-offset',
'update-mode' = 'append',
'format.type' = 'json',
'format.fail-on-missing-field' = 'true'
);
报同样的错误

-邮件原件-
发件人: wind.fly@outlook.com  
发送时间: 2020年5月18日 9:45
收件人: user-zh@flink.apache.org
主题: flink sql使用维表关联时报Temporal table join currently only supports 'FOR 
SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'

Hi, all:
   本人使用的flink 版本为1.10.0,planner为BlinkPlanner,用LEFT JOIN FOR SYSTEM_TIME AS 
OF 语法关联维表:
   select TUMBLE_END(l.dt, INTERVAL '30' SECOND) as index_time, 
l.extra_info['cityCode'] as city_code, v.vehicle_level as vehicle_level, 
CAST(COUNT(DISTINCT req_body['driverId']) as STRING) as index_value from 
x.log.yanfa_log AS l LEFT JOIN x.saic_auth_user.t_driver FOR SYSTEM_TIME AS OF 
l.proctime AS d ON l.req_body['driverId'] = d.uid LEFT JOIN 
x.saic_cms_config.t_vehicle FOR SYSTEM_TIME AS OF l.proctime AS v ON 
d.vin=v.vehicle_vin where l.ret_code = '0' and l.servicename = 
'MatchGtw.uploadLocationV4' and l.req_body['appId'] = 'saic_card' GROUP BY 
TUMBLE(l.dt, INTERVAL '30' SECOND), l.extra_info['cityCode'], v.vehicle_level;
   建表语句用了computed columns:
   CREATE TABLE x.log.yanfa_log (
dt TIMESTAMP(3),
conn_id STRING,
sequence STRING,
trace_id STRING,
span_info STRING,
service_id STRING,
msg_id STRING,
servicename STRING,
ret_code STRING,
duration STRING,
req_body MAP,
res_body MAP,
extra_info MAP,
proctime TIMESTAMP(3),
WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'x-log-yanfa_log',
'connector.properties.bootstrap.servers' = '**:9092',
'connector.properties.zookeeper.connect' = '**:2181',
'connector.startup-mode' = 'latest-offset',
'update-mode' = 'append',
'format.type' = 'json',
'format.fail-on-missing-field' = 'true'
);
报如下异常:
   Caused by: org.apache.flink.table.api.TableException: Temporal table 
join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime 
field, doesn't support 'PROCTIME()'
   at 
org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
   at 
org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:118)
   at 
org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:131)
   at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
   at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
   at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
   at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
   at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
   at 
org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
   at 
org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
   at 

flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'

2020-05-17 文章 wind.fly....@outlook.com
Hi, all:
   本人使用的flink 版本为1.10.0,planner为BlinkPlanner,用LEFT JOIN FOR SYSTEM_TIME AS 
OF 语法关联维表:
   select TUMBLE_END(l.dt, INTERVAL '30' SECOND) as index_time, 
l.extra_info['cityCode'] as city_code, v.vehicle_level as vehicle_level, 
CAST(COUNT(DISTINCT req_body['driverId']) as STRING) as index_value from 
x.log.yanfa_log AS l LEFT JOIN x.saic_auth_user.t_driver FOR SYSTEM_TIME AS OF 
l.proctime AS d ON l.req_body['driverId'] = d.uid LEFT JOIN 
x.saic_cms_config.t_vehicle FOR SYSTEM_TIME AS OF l.proctime AS v ON 
d.vin=v.vehicle_vin where l.ret_code = '0' and l.servicename = 
'MatchGtw.uploadLocationV4' and l.req_body['appId'] = 'saic_card' GROUP BY 
TUMBLE(l.dt, INTERVAL '30' SECOND), l.extra_info['cityCode'], v.vehicle_level;
   建表语句用了computed columns:
   CREATE TABLE x.log.yanfa_log (
dt TIMESTAMP(3),
conn_id STRING,
sequence STRING,
trace_id STRING,
span_info STRING,
service_id STRING,
msg_id STRING,
servicename STRING,
ret_code STRING,
duration STRING,
req_body MAP,
res_body MAP,
extra_info MAP,
proctime TIMESTAMP(3),
WATERMARK FOR dt AS dt - INTERVAL '60' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'x-log-yanfa_log',
'connector.properties.bootstrap.servers' = '**:9092',
'connector.properties.zookeeper.connect' = '**:2181',
'connector.startup-mode' = 'latest-offset',
'update-mode' = 'append',
'format.type' = 'json',
'format.fail-on-missing-field' = 'true'
);
报如下异常:
   Caused by: org.apache.flink.table.api.TableException: Temporal table 
join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime 
field, doesn't support 'PROCTIME()'
   at 
org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)
   at 
org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:118)
   at 
org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:131)
   at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)
   at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
   at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
   at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)
   at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
   at 
org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
   at 
org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
   at 
org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90)
   at 
org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846)
   at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868)
   at 

?????? ?????? ????sql????????????idle??????????????????????????????

2020-05-17 文章 claylin
??10-15??key??kafka2-5M


----
??:"??"https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4
 
 
 
 
 --nbsp;nbsp;--
 ??:nbsp;"tison"

Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

2020-05-17 文章 刘大龙
Hi,
   你的状态过期时间设置的是多久?对于普通的group by 
agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大


> -原始邮件-
> 发件人: claylin <1012539...@qq.com>
> 发送时间: 2020-05-17 17:41:13 (星期日)
> 收件人: user-zh 
> 抄送: 
> 主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
> 
> 链接这里https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4
> 
> 
> 
> 
> --原始邮件--
> 发件人:"tison" 发送时间:2020年5月17日(星期天) 下午5:34
> 收件人:"user-zh" 
> 主题:Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
> 
> 
> 
> 考虑把 SQL 贴成 gist 链接?
> 
> Best,
> tison.
> 
> 
> claylin <1012539...@qq.com 于2020年5月17日周日 下午5:32写道:
> 
>  sql作业定义如下,也通过TableConfig设置了最大和最小idle
>  
> time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE
>  TABLE yy_yapmnetwork_original ( happenAt 
> BIGINT, uid BIGINT,
>  appId STRING, deviceId 
> STRING, appVer STRING, dnsDur 
> BIGINT,
>  useGlb INT, hitCache 
> INT, requestSize DOUBLE, 
> responseSize
>  DOUBLE, totalDur BIGINT, 
> url STRING, statusCode INT,
>  prototype STRING, netType 
> STRING, traceId STRING, ts AS
>  CAST(FROM_UNIXTIME(happenAt/1000) AS 
> TIMESTAMP(3)), WATERMARK FOR ts AS
>  ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka',
>  'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics',
>  'connector.properties.zookeeper.connect' = 'localhost:2181',
>  'connector.properties.bootstrap.servers' = 
> 'kafkawx007-core001.yy.com:8101
>  ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', '
>  connector.properties.group.id' = 'interface_success_rate_consumer',
>  'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' );
>  create table request_latency_tbl ( app_id 
> string, app_ver string,
>  net_type string, prototype 
> string, url string, 
> status_code
>  int, w_start string, 
> success_cnt BIGINT, failure_cnt BIGINT,
>  total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 
> 'connector.url' =
>  
> 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=trueamp;characterEncoding=utf-8amp;zeroDateTimeBehavior=convertToNullamp;autoReconnect=true',
>  'connector.table' = 'request_latency_statistics', 'connector.username' =
>  'yapm_metrics', 'connector.password' = '1234456',
>  'connector.write.flush.max-rows' = '1000', 
> 'connector.write.flush.interval'
>  = '5s', 'connector.write.max-retries' = '2' ); create view
>  request_1minutes_latency as select appId, 
> appVer, netType, prototype,
>  url, statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm') w_start,
>  count(distinct traceId) filter (where statusCode in (200)) as 
> successCnt,
>  count(distinct traceId) filter (where statusCode not 
> in (200)) as
>  failureCnt, count(distinct traceId) as 
> total_cnt from
>  yy_yapmnetwork_original group by appId, appVer, netType, prototype, url,
>  statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm'); insert into
>  request_latency_tbl select * from 
> request_1minutes_latency;


--
刘大龙

浙江大学 控制系 智能系统与控制研究所 工控新楼217
地址:浙江省杭州市浙大路38号浙江大学玉泉校区
Tel:18867547281


回复:save point容灾方案咨询

2020-05-17 文章 1048262223
+1,如果主备都在flink内的话,可能会加倍做checkpoint的负载,个人理解直接在状态后端内部做主备集群同步效率会更高




-- 原始邮件 --
发件人: tison 

Re: save point容灾方案咨询

2020-05-17 文章 tison
这个我理解不在 Flink 的范畴里啊。你 savepoint 存到一个位置,然后外部挂一个同步器在主集群和容灾集群里同步(savepoint
目录)就可以吧。

Best,
tison.


zhisheng  于2020年5月17日周日 下午8:40写道:

> hi
>
> 如果做 Checkpoint 或者 Savepoint 的时候可以填两个 HDFS 集群的地址路径(一个是你的主集群/另一个是容灾集群)
> 是不是就可以解决你现在的问题,达到你想要的需求?
>
> Best
>
> zhisheng
>
> 请叫我雷锋 <854194...@qq.com> 于2020年5月17日周日 下午7:32写道:
>
> > 谢谢关注:
> >
> >
> > savepoint 容灾 是指的,每次执行savepoint生成的文件,能够在容灾集群上做备份。当主集群变得不可用时,可以将任务迁移到容灾
> > 集群进行根据savepoint 进行任务恢复。
> >
> >
> > --原始邮件--
> > 发件人:"Congxian Qiu" > 发送时间:2020年5月17日(星期天) 晚上6:01
> > 收件人:"user-zh" >
> > 主题:Re: save point容灾方案咨询
> >
> >
> >
> > 你好
> >
> > 请问这里的 savepoint 容灾的 “容灾” 具体是指什么呢?希望解决什么问题呢?
> >
> > Best,
> > Congxian
> >
> >
> > LakeShen  >
> >  Hi ,
> > 
> >  你可以把你的场景在描述的详细一些。
> > 
> >  Best,
> >  LakeShen
> > 
> >  请叫我雷锋 <854194...@qq.com 于2020年5月14日周四 下午9:42写道:
> > 
> >   各位大佬好,请问有啥好的save point容灾方案嘛?
> >  
> >  
> >  
> >   发自我的iPhone
> > 
>


Re: save point容灾方案咨询

2020-05-17 文章 zhisheng
hi

如果做 Checkpoint 或者 Savepoint 的时候可以填两个 HDFS 集群的地址路径(一个是你的主集群/另一个是容灾集群)
是不是就可以解决你现在的问题,达到你想要的需求?

Best

zhisheng

请叫我雷锋 <854194...@qq.com> 于2020年5月17日周日 下午7:32写道:

> 谢谢关注:
>
>
> savepoint 容灾 是指的,每次执行savepoint生成的文件,能够在容灾集群上做备份。当主集群变得不可用时,可以将任务迁移到容灾
> 集群进行根据savepoint 进行任务恢复。
>
>
> --原始邮件--
> 发件人:"Congxian Qiu" 发送时间:2020年5月17日(星期天) 晚上6:01
> 收件人:"user-zh"
> 主题:Re: save point容灾方案咨询
>
>
>
> 你好
>
> 请问这里的 savepoint 容灾的 “容灾” 具体是指什么呢?希望解决什么问题呢?
>
> Best,
> Congxian
>
>
> LakeShen 
>  Hi ,
> 
>  你可以把你的场景在描述的详细一些。
> 
>  Best,
>  LakeShen
> 
>  请叫我雷锋 <854194...@qq.com 于2020年5月14日周四 下午9:42写道:
> 
>   各位大佬好,请问有啥好的save point容灾方案嘛?
>  
>  
>  
>   发自我的iPhone
> 


?????? save point????????????

2020-05-17 文章 ??????????
??


savepoint  
savepoint??
savepoint ??


----
??:"Congxian Qiu"

Re: save point容灾方案咨询

2020-05-17 文章 Congxian Qiu
你好

请问这里的 savepoint 容灾的 “容灾” 具体是指什么呢?希望解决什么问题呢?

Best,
Congxian


LakeShen  于2020年5月15日周五 上午10:20写道:

> Hi ,
>
> 你可以把你的场景在描述的详细一些。
>
> Best,
> LakeShen
>
> 请叫我雷锋 <854194...@qq.com> 于2020年5月14日周四 下午9:42写道:
>
> > 各位大佬好,请问有啥好的save point容灾方案嘛?
> >
> >
> >
> > 发自我的iPhone
>


?????? ????sql????????????idle??????????????????????????????

2020-05-17 文章 claylin
https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4




----
??:"tison"

Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

2020-05-17 文章 tison
考虑把 SQL 贴成 gist 链接?

Best,
tison.


claylin <1012539...@qq.com> 于2020年5月17日周日 下午5:32写道:

> sql作业定义如下,也通过TableConfig设置了最大和最小idle
> time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE
> TABLE yy_yapmnetwork_original ( happenAt BIGINT, uid BIGINT,
>  appId STRING, deviceId STRING, appVer STRING, dnsDur BIGINT,
>useGlb INT, hitCache INT, requestSize DOUBLE, responseSize
> DOUBLE, totalDur BIGINT, url STRING, statusCode INT,
>  prototype STRING, netType STRING, traceId STRING, ts AS
> CAST(FROM_UNIXTIME(happenAt/1000) AS TIMESTAMP(3)), WATERMARK FOR ts AS
> ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka',
> 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics',
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
> 'connector.properties.bootstrap.servers' = 'kafkawx007-core001.yy.com:8101
> ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', '
> connector.properties.group.id' = 'interface_success_rate_consumer',
> 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' );
> create table request_latency_tbl ( app_id string, app_ver string,
>net_type string, prototype string, url string, status_code
> int, w_start string, success_cnt BIGINT, failure_cnt BIGINT,
>  total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 'connector.url' =
> 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=truecharacterEncoding=utf-8zeroDateTimeBehavior=convertToNullautoReconnect=true',
> 'connector.table' = 'request_latency_statistics', 'connector.username' =
> 'yapm_metrics', 'connector.password' = '1234456',
> 'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval'
> = '5s', 'connector.write.max-retries' = '2' ); create view
> request_1minutes_latency  as select appId, appVer, netType, prototype,
> url, statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm') w_start,
>  count(distinct traceId) filter (where statusCode in (200)) as successCnt,
>count(distinct traceId) filter (where statusCode not in (200)) as
> failureCnt, count(distinct traceId) as total_cnt from
> yy_yapmnetwork_original group by appId, appVer, netType, prototype, url,
> statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm'); insert into
> request_latency_tbl select * from  request_1minutes_latency;


????sql????????????idle??????????????????????????????

2020-05-17 文章 claylin
sqlTableConfigidle 
timesst??flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9state??CREATE
 TABLE yy_yapmnetwork_original ( happenAt BIGINT, uid BIGINT, appId 
STRING, deviceId STRING, appVer STRING, dnsDur BIGINT, useGlb 
INT, hitCache INT, requestSize DOUBLE, responseSize DOUBLE, 
totalDur BIGINT, url STRING, statusCode INT, prototype STRING, 
netType STRING, traceId STRING, ts AS CAST(FROM_UNIXTIME(happenAt/1000) 
AS TIMESTAMP(3)), WATERMARK FOR ts AS ts - INTERVAL '20' SECOND )with ( 
'connector.type' = 'kafka', 'connector.version' = 'universal', 
'connector.topic' = 'yapm_metrics', 'connector.properties.zookeeper.connect' = 
'localhost:2181', 'connector.properties.bootstrap.servers' = 
'kafkawx007-core001.yy.com:8101,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101',
 'connector.properties.group.id' = 'interface_success_rate_consumer', 
'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); create 
table request_latency_tbl ( app_id string, app_ver string, net_type 
string, prototype string, url string, status_code int, w_start 
string, success_cnt BIGINT, failure_cnt BIGINT, total_cnt BIGINT ) 
with( 'connector.type' = 'jdbc', 'connector.url' = 
'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=truecharacterEncoding=utf-8zeroDateTimeBehavior=convertToNullautoReconnect=true',
 'connector.table' = 'request_latency_statistics', 'connector.username' = 
'yapm_metrics', 'connector.password' = '1234456', 
'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval' = 
'5s', 'connector.write.max-retries' = '2' ); create view 
request_1minutes_latency  as select appId, appVer, netType, prototype, url, 
statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm') w_start, count(distinct 
traceId) filter (where statusCode in (200)) as successCnt, count(distinct 
traceId) filter (where statusCode not in (200)) as failureCnt, 
count(distinct traceId) as total_cnt from yy_yapmnetwork_original group by 
appId, appVer, netType, prototype, url, statusCode, DATE_FORMAT(ts, '-MM-dd 
HH:mm'); insert into request_latency_tbl select * from  
request_1minutes_latency;

回复:flink barrier对齐 理解

2020-05-17 文章 了不起的盖茨比
嗯嗯谢谢,我去看看





-- 原始邮件 --
发件人: tison https://ci.apache.org/projects/flink/flink-docs-master/fig/stream_aligning.svg
[2]
https://ci.apache.org/projects/flink/flink-docs-master/concepts/stateful-stream-processing.html




了不起的盖茨比 <573693...@qq.com 于2020年5月17日周日 下午2:50写道:


 
我的理解是一条数据,会经过n个算子,只有这个数据到达最后一个算子计算完毕,才能checkpoint,否则会导致前几个算子state改变,但是这个数据的offset没有被提交,导致了重复消费数据。





 -- 原始邮件 --
 发件人: Benchao Li 

Re: flink barrier对齐 理解

2020-05-17 文章 tison
Hi,

你可以看一下官网这张经典的图[1][2],snapshot 是按算子级别来看的,跟 source 不 source 没啥关系,全局的 chk 由 jm
上的 checkpoint coordinator 协调。

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/fig/stream_aligning.svg
[2]
https://ci.apache.org/projects/flink/flink-docs-master/concepts/stateful-stream-processing.html




了不起的盖茨比 <573693...@qq.com> 于2020年5月17日周日 下午2:50写道:

>
> 我的理解是一条数据,会经过n个算子,只有这个数据到达最后一个算子计算完毕,才能checkpoint,否则会导致前几个算子state改变,但是这个数据的offset没有被提交,导致了重复消费数据。
>
>
>
>
>
> -- 原始邮件 --
> 发件人: Benchao Li  发送时间: 2020年5月17日 13:28
> 收件人: user-zh  主题: 回复:flink barrier对齐 理解
>
>
>
> 我感觉应该是这样的:
>
> 比如有两个算子
> A ---hash--- B
>
> A和B分别有2和3个并发。那就是说对于B的某个subtask来讲,需要对齐上游A的2个subtask发过来的barrier,才能做checkpoint。
>
>
> 了不起的盖茨比 <573693...@qq.com 于2020年5月17日周日 下午1:16写道:
>
>  可以理解成,有多个subtask时候,需要等待不同subtask消费数据完毕,之后做checkpoint
> 
> 
> 
> 
> 
>  -- 原始邮件 --
>  发件人: Benchao Li   发送时间: 2020年5月17日 11:34
>  收件人: user-zh   主题: 回复:flink barrier对齐 理解
> 
> 
> 
>  Hi,
> 
>  我对这块不是非常了解,但是我理解的barrier对齐,指的是同一个Task的多个subtask之间对齐吧。
>  比如你只有一个source,然后经过keyby之后做了其他的操作,那也是存在barrier对齐的。
> 
>  了不起的盖茨比 <573693...@qq.comgt; 于2020年5月17日周日 上午11:29写道:
> 
>  gt; 请教一下,如果只有一个source,就不需要对齐了吧?只有source多个数据源时候才需要对齐?
> 
> 
> 
>  --
> 
>  Benchao Li
>  School of Electronics Engineering and Computer Science, Peking
> University
>  Tel:+86-15650713730
>  Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn


回复:flink barrier对齐 理解

2020-05-17 文章 了不起的盖茨比
我的理解是一条数据,会经过n个算子,只有这个数据到达最后一个算子计算完毕,才能checkpoint,否则会导致前几个算子state改变,但是这个数据的offset没有被提交,导致了重复消费数据。





-- 原始邮件 --
发件人: Benchao Li