Re: 朝鲜第二次发射军事侦察卫星,对话是解决问题的“唯一出路”

2023-08-25 Thread tison
I suggest we should ban this spamming source..

Best,
tison.


北野 曉美  于2023年8月26日周六 08:11写道:

> 朝鲜第二次发射军事侦察卫星,对话是解决问题的“唯一出路”
>
> 一位联合国高级政治事务官员今天表示,朝鲜最近发射的军事侦察卫星对国际民航和海上交通构成了重大威胁,强调需要采取切实可行的措施来缓解朝鲜半岛的紧张局势,为对话创造空间。
>
>
> 和平与安全
> 考克斯巴扎尔的一个罗兴亚难民营受到气旋“摩卡”冲击。
> 缅甸罗兴亚危机爆发六周年,联合国呼吁寻求全面、持久和包容解决方案
>
> 在8月25日缅甸西部若开邦罗兴亚人开始大规模出逃六年整之际,联合国秘书长古特雷斯通过发言人发表声明,呼吁国际社会继续针对缅甸危机寻求全面、包容和持久的解决方案。
>
>
> 人道主义援助
> 联合国继续向刚果(金)东部北基伍省因武装冲突而流离失所的人们提供人道主义援助。
> 刚果民主共和国670万人面临严重粮食不安全
>
> 世界粮食计划署今天表示,在刚果民主共和国东部的北基伍省、南基伍省和伊图里省,670万人面临严重的粮食不安全问题。然而,援助资源严重不足,无法满足当地的高位人道主义需求。
>
>
> 人道主义援助
> 一户苏丹家庭在位于乍得边境的难民入境点避难。
> 紧急救济协调员:战争和饥饿恐将摧毁苏丹
>
> 主管人道主义事务副秘书长兼紧急救济协调员格里菲思今天表示,苏丹战乱或将使该国儿童成为“迷失的一代”,他们的未来岌岌可危,如果冲突和饥饿蔓延,可能会摧毁这个国家。
>
>
> 人道主义援助
> 女孩们坐在泥屋受损的墙上,这座房子在2022年巴基斯坦的洪水中几乎被摧毁。
> 巴基斯坦洪灾一周年:儿童的灾难仍在继续
> 在前所未有的洪灾侵袭巴基斯坦一周年之际,联合国儿童基金会驻巴基斯坦代表法迪尔(Abdullah
> Fadil)今天发表讲话称,由于缺少恢复和重建的资金,巴基斯坦数百万儿童仍需依赖人道主义援助,400万儿童无法获得安全的饮用水。
>
>


Re: Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 Thread tison
如果 calcite 层的接口不变,直接替换 jar 包或许也可行?不确定从 1.27 -> 1.29 有没有不兼容的情况。

Best,
tison.


Jane Chan  于2023年3月22日周三 18:11写道:

> Hi,
>
> 如回复所述, 如果不想切换版本, 在 1.15 上可以尝试手动 cast 'abc' 字段为 varchar 来绕过这个问题
> map ['msg_code','0', 'msg_reason', cast('abc' as string)]
>
> 如果不想修改 SQL, 目前只能手动编译出 release-1.17 分支, 编译方法参考 [1]
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/
>
> 祝好!
> Jane
>
> On Wed, Mar 22, 2023 at 6:04 PM Jeff  wrote:
>
> > 通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
> > >Hi,
> > >
> > >如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
> > >
> > >Sincerely,
> > >Shuo
> > >
> > >On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
> > >
> > >> 复制执行我提供的两个sql就一定会复现!
> > >> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
> > >> 这个问题是这个版本calcite引起的。
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> 在 2023-03-22 09:28:17,"Jeff"  写道:
> > >> >bug地址:
> > >> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
> > >> >
> > >> >
> > >> >bug详细内容:
> > >> >the values of map are truncated by the CASE WHEN
> > function.
> > >> >// sql
> > >> >create table test (a map) with
> ('connector'='print');
> > >> >insert into test  select * from (values(case when true then
> > >> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
> > >> end));
> > >> >
> > >> >the result:
> > >> >
> > >> >+I[{test=123}]
> > >> >
> > >> >We hope the value of result is '123456789', but I get '123', the
> length
> > >> is limited by 'abc'.
> > >>
> >
>


Re: 我上报的一个sql bug没人处理怎么办?

2023-03-21 Thread tison
你可以关注下发布动态,测试一下 RC
https://lists.apache.org/thread/d9o0tgnv0fl9goqsdo8wmq9121b9wolv

Best,
tison.


tison  于2023年3月22日周三 11:47写道:

> Flink master 上 calcite 的版本是 1.29,看起来会在 Flink 1.17 release 出来
>
> Best,
> tison.
>
>
> Shuo Cheng  于2023年3月22日周三 11:42写道:
>
>> Hi,
>>
>> 如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
>>
>> Sincerely,
>> Shuo
>>
>> On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
>>
>> > 复制执行我提供的两个sql就一定会复现!
>> > 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
>> > 这个问题是这个版本calcite引起的。
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > 在 2023-03-22 09:28:17,"Jeff"  写道:
>> > >bug地址:
>> > >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
>> > >
>> > >
>> > >bug详细内容:
>> > >the values of map are truncated by the CASE WHEN
>> function.
>> > >// sql
>> > >create table test (a map) with ('connector'='print');
>> > >insert into test  select * from (values(case when true then
>> > map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
>> > end));
>> > >
>> > >the result:
>> > >
>> > >+I[{test=123}]
>> > >
>> > >We hope the value of result is '123456789', but I get '123', the length
>> > is limited by 'abc'.
>> >
>>
>


Re: 我上报的一个sql bug没人处理怎么办?

2023-03-21 Thread tison
Flink master 上 calcite 的版本是 1.29,看起来会在 Flink 1.17 release 出来

Best,
tison.


Shuo Cheng  于2023年3月22日周三 11:42写道:

> Hi,
>
> 如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
>
> Sincerely,
> Shuo
>
> On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
>
> > 复制执行我提供的两个sql就一定会复现!
> > 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
> > 这个问题是这个版本calcite引起的。
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2023-03-22 09:28:17,"Jeff"  写道:
> > >bug地址:
> > >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
> > >
> > >
> > >bug详细内容:
> > >the values of map are truncated by the CASE WHEN
> function.
> > >// sql
> > >create table test (a map) with ('connector'='print');
> > >insert into test  select * from (values(case when true then
> > map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
> > end));
> > >
> > >the result:
> > >
> > >+I[{test=123}]
> > >
> > >We hope the value of result is '123456789', but I get '123', the length
> > is limited by 'abc'.
> >
>


Re: Flink作业修改时State的兼容性

2022-07-30 Thread tison
这个同样可以看两份材料

*
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/
* https://zhuanlan.zhihu.com/p/119305376

简单说来只是改并发的话 Key state 比较好迁移,operator state 各有各的特性。如果明显改了逻辑就需要手动跑出新状态。

Best,
tison.


m18814122325  于2022年7月30日周六 14:21写道:

> 当Flink作业在业务变更修改时,在用checkpoint或者savepoint进行状态恢复时,兼容性如何?有相关文档吗?
>
> 我在阿里云找到SQL修改和兼容性的文档https://help.aliyun.com/document_detail/403317.html。
> 但不知道文档说明的是阿里云里的Flink版本还是开源版本。


Re: Flink内部如何做到消息不丢失?

2022-07-30 Thread tison
可以看下这两份材料

*
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/fault-tolerance/checkpointing/
* https://zhuanlan.zhihu.com/p/102607983

其实就是 Flink 里 Exactly-once 的实现方式,简单说来就是分布式快照批量提交,上游数据可以回放。

Best,
tison.


m18814122325  于2022年7月30日周六 14:22写道:

>
> 在Storm中会有ack机制来保证消息是否被下个算子是否被处理,那么请问在Flink框架内部中上游算子通过Netty发送消息到下游时,如何做到消息不会因为网络原因等各种异常情况产生丢失情况?
>
> 谢谢


Re: 反复提交Job会导致TaskManager 元空间oom?

2022-07-04 Thread tison
你这个邮件地址有点不对劲,可能被邮件列表拦截了,或者没有先通过 user-zh-subscr...@flink.apache.org
订阅。因为上面已经有人回复你了,但是你好像还没看到。

如何订阅邮件列表可以自己搜一下,我记得 Flink China 写过文章手把手教学。在此之前你可以通过
https://lists.apache.org/thread/7v19bkqqwp49vpdmkcr4yvdh6bn5bfkm 看看其他人的回复。

Best,
tison.


LuNing Wang  于2022年7月4日周一 17:11写道:

> 目前我觉得最好的解决办法就是定期重启JM和TM进程。
>
>
> 知而不惑  于2022年7月4日周一 17:07写道:
>
> > 这个问题没有人解答吗?
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:
> >   "知而不惑"
> > <
> > chenliangv...@qq.com;
> > 发送时间:2022年5月18日(星期三) 上午10:45
> > 收件人:"user-zh" >
> > 主题:反复提交Job会导致TaskManager 元空间oom?
> >
> >
> >
> > 请问大家一个问题,
> > 场景:
> > 版本是Flink 1.14
> > 我们使用standalone 模式,我们的Flink job由supervisorctl托管,JM和TM用systemd托管
> >
> >
> > 异常:
> > job异常重启设置了两次的flink延迟重启:restart-strategy.fixed-delay.attempts: 2,
> > 我们线上有个业务代码没有捕获一个异常,导致job重启两次后,再由supervisorctl重新提交job,循环了很多次之后,
> >
> >
> TM出现了元空间OOM(我们已经把元空间的内存加大,还是会出现),然后TM就掉了,控制台上没有TM了,这影响了其他的job,但是TM进程也没有退出,我们的TM由Systemd托管,所以TM一直没有重启,
> > 处在一个“假死”状态,我们是用的standalone模式,只有一个TM,
> >
> >
> > 日志:
> > TM日志出现:TM metaspace oom
> > JM日志:Association with remote system [akka.tcp://flink@localhost:43583]
> > has failed, address is now gated for [50] ms. Reason: [Association failed
> > with [akka.tcp://flink@localhost:43583]] Caused by:
> > [java.net.ConnectException: Connection refused: localhost/
> 127.0.0.1:43583]
> > JM 连接 TM接口失败,unreachable
> >
> >
> >
> > 补充:
> > 我们把元空间内存配置放到512M。再次重现:
> > 发现每次提交job的时候:
> > 观察tm metaspace 内存变化:179MB 183MB 207MB 232MB 256MB
> > 280MB 352MB 372MB
> > 元空间一直没回收,这样最终会导致TM metaspace oom
> >
> >
> >
> > 问题:
> > 1.想问下TM元空间oom异常,是反复提交job造成,还是job的业务代码有问题,
> > 2.TM元空间OOM为什么会导致JM认为TM掉线,TM也不自己退出进程
> >
> >
> > 希望获得的帮助:
> > 1.上述问题原因
> > 2.有什么办法可以在standalone模式下,识别到TM掉线,从而我们能做一些自动的运维操作:比如重启整个集群
>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.0.0 released

2022-06-05 Thread tison
Congrats! Thank you all for making this release happen.

Best,
tison.


rui fan <1996fan...@gmail.com> 于2022年6月5日周日 17:19写道:

> Thanks Yang for driving the release, and thanks to
> all contributors for making this release happen!
>
> Best wishes
> Rui Fan
>
> On Sun, Jun 5, 2022 at 4:14 PM Yang Wang  wrote:
>
> > The Apache Flink community is very happy to announce the release of
> Apache
> > Flink Kubernetes Operator 1.0.0.
> >
> > The Flink Kubernetes Operator allows users to manage their Apache Flink
> > applications and their lifecycle through native k8s tooling like kubectl.
> > This is the first production ready release and brings numerous
> > improvements and new features to almost every aspect of the operator.
> >
> > Please check out the release blog post for an overview of the release:
> >
> >
> https://flink.apache.org/news/2022/06/05/release-kubernetes-operator-1.0.0.html
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Maven artifacts for Flink Kubernetes Operator can be found at:
> >
> >
> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
> >
> > Official Docker image for Flink Kubernetes Operator applications can be
> > found at:
> > https://hub.docker.com/r/apache/flink-kubernetes-operator
> >
> > The full release notes are available in Jira:
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351500
> >
> > We would like to thank all contributors of the Apache Flink community who
> > made this release possible!
> >
> > Regards,
> > Gyula & Yang
> >
>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.0.0 released

2022-06-05 Thread tison
Congrats! Thank you all for making this release happen.

Best,
tison.


rui fan <1996fan...@gmail.com> 于2022年6月5日周日 17:19写道:

> Thanks Yang for driving the release, and thanks to
> all contributors for making this release happen!
>
> Best wishes
> Rui Fan
>
> On Sun, Jun 5, 2022 at 4:14 PM Yang Wang  wrote:
>
> > The Apache Flink community is very happy to announce the release of
> Apache
> > Flink Kubernetes Operator 1.0.0.
> >
> > The Flink Kubernetes Operator allows users to manage their Apache Flink
> > applications and their lifecycle through native k8s tooling like kubectl.
> > This is the first production ready release and brings numerous
> > improvements and new features to almost every aspect of the operator.
> >
> > Please check out the release blog post for an overview of the release:
> >
> >
> https://flink.apache.org/news/2022/06/05/release-kubernetes-operator-1.0.0.html
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Maven artifacts for Flink Kubernetes Operator can be found at:
> >
> >
> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
> >
> > Official Docker image for Flink Kubernetes Operator applications can be
> > found at:
> > https://hub.docker.com/r/apache/flink-kubernetes-operator
> >
> > The full release notes are available in Jira:
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351500
> >
> > We would like to thank all contributors of the Apache Flink community who
> > made this release possible!
> >
> > Regards,
> > Gyula & Yang
> >
>


Re: 对Flink Table Store咨询

2022-04-23 Thread tison
Flink Table Store 不是应用,而是库。我理解是要配合 Flink
来使用的,断点调试的话,看你的需求,如果只是对一段代码有疑问,跑测试打断点就行了。

Best,
tison.


陈卓宇 <2572805...@qq.com.invalid> 于2022年4月24日周日 09:59写道:

> 您好大佬:
>  我对Flink Table
> Store非常感兴趣,想请教您一下怎么结合flink做断点调试,因为看了一下没有找到入口类
>
> 陈卓宇
>
>
> 


Re: Flink on yarn ,并行度>1的情况下,怎么获取springboot的bean?

2022-04-22 Thread tison
@duwenwen 我比较好奇你的算子里做了什么,因为如果你就是要全局初始化唯一一次,那就用一个 parallelism=1 的算子去做就好了。

parallelism=n 你还要确保 once 的话应该得依赖外部系统来做到仅初始化一次。

Best,
tison.


Paul Lam  于2022年4月22日周五 18:16写道:

> 听起来是在 Flink 里启动 springboot? 很有意思的架构,有一点点类似 statefun 了。可以说说这么做的背景吗?
>
> 另外请附带上 flink 的部署模式和版本信息,这样大家才好判断问题在哪里。
>
> Best,
> Paul Lam
>
> > 2022年4月22日 16:30,duwenwen  写道:
> >
> > 您好:
> >首先很感谢您能在百忙之中看到我的邮件。我是一个写代码的新手,在使用flink框架过程中我遇到了一些问题,希望能得到您的解答。
> 由于需求要求,我需要将springboot和flink结合起来使用,我在open方法中获取springboot的上下文来获取bean。当设置parallelism为1时,可以发布到集群正常运行,但是当parallelism>1时,springboot的环境被多次初始化,运行就开始报错,,请问当parallelism>1
> 或者说当taskmanager>1时,应该怎么处理才能顺利获取到springboot的bean?针对上述问题希望能从您那里获得相应解决方案,十分感谢。
>
>


Re: flink table store

2022-04-07 Thread tison
我有点好奇官网看到的链接在哪,能不能来个链接捏。

Best,
tison.


Leonard Xu  于2022年4月7日周四 14:47写道:

>
> 项目是开源的[1], 最近快要发布第一个版本了,可以关注下
>
> Best,
> Leonard
> [1] https://github.com/apache/flink-table-store <
> https://github.com/apache/flink-table-store>
>
>
>
> > 2022年4月7日 上午9:54,Xianxun Ye  写道:
> >
> > 这里有 flink table store 的设计文档,你可以了解下。
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> >
> >
> > Best regards,
> >
> >
> > Xianxun
> >
> >
> > On 04/6/2022 16:56,LuNing Wang wrote:
> > Hi,
> >
> > Table store是存储,应和数据湖类似
> >
> > Best,
> > LuNing Wang
> >
> > yidan zhao  于2022年4月6日周三 16:55写道:
> >
> > 看官网出来个 flink table store,和 flink table、flink sql 那一套有啥区别呢?
> >
>
>


Re: 如何实现event triggered window?

2021-11-23 Thread tison
如果就是要 Event Trigger 那就是传统意义上的 Sliding Windows (Flink 的滑动窗口其实是 Hopping
Window),这个确实是 OVER AGG 能搞定的 =。=

Best,
tison.


Tony Wei  于2021年11月23日周二 下午2:06写道:

> Hi Pinjie,
>
> 如果是需要 event triggered 的累計統計更新的話,可以考慮使用 SQL over aggregation
> [1]。例如文件中提供的如下範例,計算當前 row  往前一小時內的加總結果。
>
> > SELECT order_id, order_time, amount,
> >   SUM(amount) OVER (
> > PARTITION BY product
> > ORDER BY order_time
> > RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
> >   ) AS one_hour_prod_amount_sumFROM Orders
> >
> > 但是這種作法只能根據收到的事件來觸發,無法根據處理時間。換句話說,如果 t=X 沒有數據進來的話,就不會有 t=(X-1) ~ X
> 的累計統計輸出。
> 考慮更複雜的情況需要結合事件和處理時間來觸發的話,需要透過 Process Function API 或者用 DataStream API 自定義
> Trigger 的方式實現。
>
> best regards,
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/over-agg/
>
> tison  於 2021年11月23日 週二 下午2:03寫道:
>
> > 如果你是想每时每刻(实际上开销很大,假设是每 1 分钟),那就用 Sliding Window
> >
> > Best,
> > tison.
> >
> >
> > tison  于2021年11月23日周二 下午2:00写道:
> >
> > > 你的理解里就是必须整点对齐嘛,那其实是可以加个 offset 不整点对齐的捏。
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > tison  于2021年11月23日周二 下午1:59写道:
> > >
> > >>
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/
> > >>
> > >> 你可以关注一下这里有个 offset 参数,这样就能用 tumbling window 了 =。=
> > >>
> > >> Best,
> > >> tison.
> > >>
> > >>
> > >> Pinjie Huang  于2021年11月23日周二
> > 下午1:18写道:
> > >>
> > >>> Hi Yidan,
> > >>>
> > >>> Tumbling window 只有
> > >>> t=0~1h
> > >>> t=1~2h
> > >>> 等等的window
> > >>>
> > >>> 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如
> > >>> t=1.5h 时刻 我需要 t=0.5~1.5h 这个window
> > >>>
> > >>> On Tue, Nov 23, 2021 at 12:32 PM yidan zhao 
> > wrote:
> > >>>
> > >>> > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。
> > >>> >
> > >>> > zhiyuan su  于2021年11月22日周一 下午4:59写道:
> > >>> >
> > >>> > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足
> > >>> > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL=1
> > >>> > > 具体在第2章第一节
> > >>> > >
> > >>> > > Pinjie Huang  于2021年11月22日周一
> > >>> > 下午3:52写道:
> > >>> > >
> > >>> > > > Hi friends,
> > >>> > > >
> > >>> > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event
> > triggerred。
> > >>> > > >
> > >>> > > > 比如说 想知道过去1小时event A trigger的次数,
> > >>> > > >
> > >>> > > > 如果使用tumbling window和1h window
> > >>> > > > |1h | 1h |
> > >>> > > > t=0
> > >>> > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。
> > >>> > > >
> > >>> > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。
> > >>> > > >
> > >>> > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window?
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>>
> > >>> --
> > >>> Thanks,
> > >>> Pinjie Huang
> > >>>
> > >>
> >
>


Re: 如何实现event triggered window?

2021-11-22 Thread tison
如果你是想每时每刻(实际上开销很大,假设是每 1 分钟),那就用 Sliding Window

Best,
tison.


tison  于2021年11月23日周二 下午2:00写道:

> 你的理解里就是必须整点对齐嘛,那其实是可以加个 offset 不整点对齐的捏。
>
> Best,
> tison.
>
>
> tison  于2021年11月23日周二 下午1:59写道:
>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/
>>
>> 你可以关注一下这里有个 offset 参数,这样就能用 tumbling window 了 =。=
>>
>> Best,
>> tison.
>>
>>
>> Pinjie Huang  于2021年11月23日周二 下午1:18写道:
>>
>>> Hi Yidan,
>>>
>>> Tumbling window 只有
>>> t=0~1h
>>> t=1~2h
>>> 等等的window
>>>
>>> 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如
>>> t=1.5h 时刻 我需要 t=0.5~1.5h 这个window
>>>
>>> On Tue, Nov 23, 2021 at 12:32 PM yidan zhao  wrote:
>>>
>>> > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。
>>> >
>>> > zhiyuan su  于2021年11月22日周一 下午4:59写道:
>>> >
>>> > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足
>>> > >
>>> > >
>>> >
>>> https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL=1
>>> > > 具体在第2章第一节
>>> > >
>>> > > Pinjie Huang  于2021年11月22日周一
>>> > 下午3:52写道:
>>> > >
>>> > > > Hi friends,
>>> > > >
>>> > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。
>>> > > >
>>> > > > 比如说 想知道过去1小时event A trigger的次数,
>>> > > >
>>> > > > 如果使用tumbling window和1h window
>>> > > > |1h | 1h |
>>> > > > t=0
>>> > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。
>>> > > >
>>> > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。
>>> > > >
>>> > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window?
>>> > > >
>>> > >
>>> >
>>>
>>>
>>> --
>>> Thanks,
>>> Pinjie Huang
>>>
>>


Re: 如何实现event triggered window?

2021-11-22 Thread tison
你的理解里就是必须整点对齐嘛,那其实是可以加个 offset 不整点对齐的捏。

Best,
tison.


tison  于2021年11月23日周二 下午1:59写道:

>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/
>
> 你可以关注一下这里有个 offset 参数,这样就能用 tumbling window 了 =。=
>
> Best,
> tison.
>
>
> Pinjie Huang  于2021年11月23日周二 下午1:18写道:
>
>> Hi Yidan,
>>
>> Tumbling window 只有
>> t=0~1h
>> t=1~2h
>> 等等的window
>>
>> 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如
>> t=1.5h 时刻 我需要 t=0.5~1.5h 这个window
>>
>> On Tue, Nov 23, 2021 at 12:32 PM yidan zhao  wrote:
>>
>> > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。
>> >
>> > zhiyuan su  于2021年11月22日周一 下午4:59写道:
>> >
>> > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足
>> > >
>> > >
>> >
>> https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL=1
>> > > 具体在第2章第一节
>> > >
>> > > Pinjie Huang  于2021年11月22日周一
>> > 下午3:52写道:
>> > >
>> > > > Hi friends,
>> > > >
>> > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。
>> > > >
>> > > > 比如说 想知道过去1小时event A trigger的次数,
>> > > >
>> > > > 如果使用tumbling window和1h window
>> > > > |1h | 1h |
>> > > > t=0
>> > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。
>> > > >
>> > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。
>> > > >
>> > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window?
>> > > >
>> > >
>> >
>>
>>
>> --
>> Thanks,
>> Pinjie Huang
>>
>


Re: 如何实现event triggered window?

2021-11-22 Thread tison
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/

你可以关注一下这里有个 offset 参数,这样就能用 tumbling window 了 =。=

Best,
tison.


Pinjie Huang  于2021年11月23日周二 下午1:18写道:

> Hi Yidan,
>
> Tumbling window 只有
> t=0~1h
> t=1~2h
> 等等的window
>
> 我现在需要在 t=X 时刻,得到 t=(X-1) ~ X 的window 比如
> t=1.5h 时刻 我需要 t=0.5~1.5h 这个window
>
> On Tue, Nov 23, 2021 at 12:32 PM yidan zhao  wrote:
>
> > 其实问题还是没描述清楚,所以遇到问题究竟是啥,没发现tumbling window不行。
> >
> > zhiyuan su  于2021年11月22日周一 下午4:59写道:
> >
> > > 感觉你的场景是近实时累计统计,你可以参考下window TVF 看下是否满足
> > >
> > >
> >
> https://flink-learning.org.cn/article/detail/a8b0895d4271bf6b770927eea214612d?tab=SQL=1
> > > 具体在第2章第一节
> > >
> > > Pinjie Huang  于2021年11月22日周一
> > 下午3:52写道:
> > >
> > > > Hi friends,
> > > >
> > > > Flink 自带的window 有tumlbing sliding 和 session 但是似乎没有event triggerred。
> > > >
> > > > 比如说 想知道过去1小时event A trigger的次数,
> > > >
> > > > 如果使用tumbling window和1h window
> > > > |1h | 1h |
> > > > t=0
> > > > 在t=1.5h时刻,读取数据,是t=1h 时刻过去一小时的数据,而不是实时的。
> > > >
> > > > 使用sliding window 的话需要define非常小的slide,而且依旧可能有延迟。
> > > >
> > > > 如果想知道实时的数据,需要基于event来更新state,如何实现event triggered window?
> > > >
> > >
> >
>
>
> --
> Thanks,
> Pinjie Huang
>


Re: rocksdb状态后端最多保留checkpoints问题

2021-05-27 Thread tison
rocksdb 增量 checkpoint 不是你这么理解的,总的不会恢复不了。原因可以参考下面的材料

-
https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
官方 blog 介绍
- https://www.bilibili.com/video/BV1db411e7x2 施博士的介绍,大概 24 分钟开始讲

Best,
tison.


casel.chen  于2021年5月27日周四 下午11:35写道:

> 作业中使用了增量式的rocksdb状态后端,请问如果再设置了最多保留checkpoints个数的话,会不会造成rocksdb
> state恢复失败?例如,假设完整的state恢复需要最近10个chk,但因为设置了最多保留checkpoints个数为5的话,状态是不是恢复不了了?


Re: 关于flink CheckPoint 状态数据保存的问题

2021-04-01 Thread tison
这个配置本身我看了一下只能走 flink-conf.yaml,而且似乎是 per cluster 配置的,虽然 perjob /
application 部署的时候没啥问题,但是 session 可能就不行了。配置这块 Flink 是有点全走 flink-conf.yaml +
默认你是用 perjob / application 的意思。

你提的数据看不到的问题,首先确认一下是否 chk 真的有数据。另外我依稀记得 tangyun(in cc) 做过一个改动,可以问下他的看法。

Best,
tison.


tison  于2021年4月1日周四 下午3:50写道:

> 只有一个的问题是因为默认保留的 chk 数量是一个,可以修改这个配置[1]来改变。
>
> Best,
> tison.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#state-checkpoints-num-retained
>
>
> lp <973182...@qq.com> 于2021年4月1日周四 下午3:48写道:
>
>> 我写了一个带状态的function
>> 采用了如下cp配置:
>> env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
>> env.getCheckpointConfig().setCheckpointTimeout(6L);
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> env.setStateBackend(new FsStateBackend("file:///cp/eventCounter"));
>>
>> 请教几个问题:
>> ①按照官网的介绍,目录数据应该是这样的
>> /user-defined-checkpoint-dir
>> /{job-id}
>> |
>> + --shared/
>> + --taskowned/
>> + --chk-1/
>> + --chk-2/
>> + --chk-3/
>> ...
>>
>> 但是我的测试是job正常running时,chk-*永远只有一个,每次做chk,递增 +1 一次
>>
>>
>> ②状态数据按照理解是保存在chk-*下面的,但是我的测试下面只有一个_metadata,并没有每次chk的数据,使用的flink1.12
>> 当我改成使用flink1.8时,是可以看到如下chk-*目录下除了_metadata,还有每次的chk数据.
>> 所以flink1.12高版本的情况每次chk的数据在哪里
>>
>>
>>
>> ③按照官网介绍,默认只保留最新的一份chk数据,如果想保留最近的多份,除了全局flink-conf.yaml中配置state.checkpoints.num-retained:
>> 5, 有程序中使用env 针对每job的配置方式吗
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>


Re: 关于flink CheckPoint 状态数据保存的问题

2021-04-01 Thread tison
只有一个的问题是因为默认保留的 chk 数量是一个,可以修改这个配置[1]来改变。

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#state-checkpoints-num-retained


lp <973182...@qq.com> 于2021年4月1日周四 下午3:48写道:

> 我写了一个带状态的function
> 采用了如下cp配置:
> env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
> env.getCheckpointConfig().setCheckpointTimeout(6L);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setStateBackend(new FsStateBackend("file:///cp/eventCounter"));
>
> 请教几个问题:
> ①按照官网的介绍,目录数据应该是这样的
> /user-defined-checkpoint-dir
> /{job-id}
> |
> + --shared/
> + --taskowned/
> + --chk-1/
> + --chk-2/
> + --chk-3/
> ...
>
> 但是我的测试是job正常running时,chk-*永远只有一个,每次做chk,递增 +1 一次
>
>
> ②状态数据按照理解是保存在chk-*下面的,但是我的测试下面只有一个_metadata,并没有每次chk的数据,使用的flink1.12
> 当我改成使用flink1.8时,是可以看到如下chk-*目录下除了_metadata,还有每次的chk数据.
> 所以flink1.12高版本的情况每次chk的数据在哪里
>
>
>
> ③按照官网介绍,默认只保留最新的一份chk数据,如果想保留最近的多份,除了全局flink-conf.yaml中配置state.checkpoints.num-retained:
> 5, 有程序中使用env 针对每job的配置方式吗
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-08 Thread tison
>意思是可以通过相关API,去读一个jar包并提交Job吗?要提交到的集群也是通过配置参数传入代码里,是大
概这样的一个过程吗?有相关的文档或者demo吗?我在网上一直找不到相关内容。

是的,目前公开的 API 是命令行,内部是 ClusterDescriptor、CliFrontend
等一系列类在驱动。定制的时候通常直接根据内部类来编程,但是它们不是公开接口,可能随时会改变。目前没有更好的办法。

>如果和自己系统集成的话,是把这些页面以超链接的形式集成到系统里面吗,在系统dashboard中点某个按钮,跳转到flink webui的某一个模块里?

这个集成有很多种办法了,包括你页面嵌套页面,或者页面跳转页面,或者直接二开 Flink Web 模块,或者在完全自主开发的页面里调用 REST
API,等等。

Best,
tison.


Jacob <17691150...@163.com> 于2021年3月9日周二 上午9:42写道:

> 谢谢提供思路,刚通过接口编程这个思路找到了一些文章和demo。
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 关于Watermark的使用调试问题

2021-03-07 Thread tison
可以中途产生,走这个接口

org.apache.flink.streaming.api.datastream.DataStream#assignTimestampsAndWatermarks(org.apache.flink.api.common.eventtime.WatermarkStrategy)

麻烦贴一下你加 watermark 的代码和 pipeline 看一下啥情况

Best,
tison.


Xavier  于2021年3月7日周日 下午7:51写道:

>想问下社区,watermark必须加在数据源上吗?顺便想问下一般用什么方式来调watermark,我自己本地有发现加在map
> function之后,watermark会自动重置为默认值的情况。
> 谢谢!
> --
>
> Best Regards,
> *Xavier*
>


Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-06 Thread tison
有的,通过 FLINK 和 YARN 或 k8s 的接口进行编程,管理元数据,管理用户文件,支持提交作业及之后管理作业状态,这是许多公司应用
Flink 的实现方式。

你说的前端的东西是对接口返回值的友好展示,Flink 本身有一个 Web 前端,可以支持你要的大部分功能,但是多任务可能会有一些缺陷,尤其你不是使用
session 模式的情况下。

向下整合 Flink 能力以及实际部署的集群信息,向上提供人性化的交互页面,按照传统的 Web App 开发思路就可以的。

Best,
tison.


Jacob <17691150...@163.com> 于2021年3月6日周六 下午4:00写道:

> 我们现在提交Flink Job
> 是通过flink客户端run命令提交job,进行实时任务的计算,每次提交都要登录prd机器,上传jar包,过程比较麻烦。
>
>
> 后期规划把实时计算的任务集成到我们已有的一个系统中,把上面描述的过程封装起来,给用户提供一些按钮、菜单等,理想状态下,在这个系统增加一些模块、菜单之类的东西,就能完成对Job的维护,包括提交Job、查看正在运行的Job、停止Job等等
>
>
> 上面所说的这个系统是我们自研的一个数据处理平台,实时计算任务也是其中的一环,因此就想把实时计算的任务的模块也集成到其中去。
>
>
> 不知道这有没有可能实现
>
> 请大佬提供些许思路!感谢
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:Flink checkpoint 速度慢问题请教

2021-03-02 Thread tison
Hi Jacob,

能通过日志或监控判断是 checkpoint 时 snapshot 的 sync 阶段慢,还是 async 阶段慢,还是上传到 HDFS
时间长或是其他阶段的瓶颈吗?

几十 KB 的状态慢很可能是某个步骤出故障卡住了。

Best,
tison.


yidan zhao  于2021年3月2日周二 下午3:58写道:

> 我比较奇怪的是再慢的磁盘,对于几十KB的状态也不至于“慢”吧。
>
> Jacob <17691150...@163.com> 于2021年3月2日周二 上午10:34写道:
>
> > 谢谢回复
> >
> > 我用的是filesystem,
> > 相关配置如下:
> >
> >
> > state.backend: filesystem
> > state.checkpoints.dir: hdfs://nameservice1/datafeed/prd/flink_checkpoint
> > state.savepoints.dir: hdfs://nameservice1/datafeed/prd/flink_checkpoint
> > state.backend.incremental: false
> > state.backend.fs.memory-threshold: 1024
> > state.checkpoints.num-retained: 3
> > restart-strategy: fixed-delay
> > restart-strategy.fixed-delay.attempts: 1000
> > restart-strategy.fixed-delay.delay: 30 s
> >
> >
> >
> > 后面把上面配置注释掉,然后在代码中指定了checkpoint类型为内存,但速度还是很慢。
> >
> >
> >
> > -
> > Thanks!
> > Jacob
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: 如何在程序里面判断作业是否是重启了

2021-02-04 Thread tison
目前想到的是加一个调度器插件,在重启事件那边 hook 一下。

正常的重启流程貌似没有其他 hook 点了,抄送一下这方面的专家(in cc)看看有没有其他意见。

Best,
tison.


熊云昆  于2021年2月5日周五 上午11:30写道:

>
> super.getRuntimeContext().getAttemptNumber()试试这个方法获取重启次数试试,如果没有重启过是0,反之每重启一次就会加1
>
>
> | |
> 熊云昆
> |
> |
> 邮箱:xiongyun...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2021年02月04日 11:42,op 写道:
> 你好,我们下游不是所有需求都会去重,开销有点大。。。
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> zapj...@163.com;
> 发送时间:2021年2月4日(星期四) 中午11:31
> 收件人:"user-zh"
> 主题:Re:回复: 如何在程序里面判断作业是否是重启了
>
>
>
>
>
>
> 下游数据做好幂等操作,就不怕重复操作了。。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-02-04 11:26:56,"op" <520075...@qq.com 写道:
> 重启可能会导致数据重发,想加个告警
> 
> 
> 
> 
> --nbsp;原始邮件nbsp;--
> 发件人:
> "user-zh"
>  发送时间:nbsp;2021年2月4日(星期四) 中午11:11
> 收件人:nbsp;"user-zh" 
> 主题:nbsp;Re: 如何在程序里面判断作业是否是重启了
> 
> 
> 
> 业务上的需求是什么?
> 
> Best,
> tison.
> 
> 
> op <520075...@qq.comgt; 于2021年2月4日周四 上午11:04写道:
> 
> gt; 大家好:
> gt; amp;nbsp;
> gt;
> amp;nbsp;我在程序里通过RestartStrategies设置了重启策略,现在想在算子里面判断是否是触发了Restart,请问有哪些方法能实现呢?


Re: 如何在程序里面判断作业是否是重启了

2021-02-03 Thread tison
业务上的需求是什么?

Best,
tison.


op <520075...@qq.com> 于2021年2月4日周四 上午11:04写道:

> 大家好:
> 
> 我在程序里通过RestartStrategies设置了重启策略,现在想在算子里面判断是否是触发了Restart,请问有哪些方法能实现呢?


Re: Flink 提交作业时的缓存可以删除吗

2021-02-01 Thread tison
org/apache/flink/yarn/YarnResourceManagerDriver.java:236
org/apache/flink/yarn/YarnClusterDescriptor.java:495

应该是会在作业退出或者强杀的时候清理的,你可以看一下对应版本有无这个逻辑

可以加一下日志看看实际是否触发,删除的是什么目录

Best,
tison.


Robin Zhang  于2021年2月2日周二 下午2:37写道:

> Flink 1.12下会将flink的依赖以及作业的jar包缓存在hdfs上,如下图:
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t447/flink_%E6%8F%90%E4%BA%A4%E6%97%B6%E7%BC%93%E5%AD%98.png>
>
>
>
> 由于flink很早就开始使用了,这种目录越来越多,就算任务不在运行也不会自动清除。经过简单测试,直接删除后,不影响任务的运行以及简单的状态恢复。目前不知道会不会存在其他依赖,希望有清楚的能解释下这个的原理、作用以及能否删除。
> 删除的目的是为了节省hdfs空间,做自身优化;另一方面是想弄清楚这个的原理
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: 水印的作用请教

2021-01-31 Thread tison
对于 StreamingFileSink 可以查看这两份资料

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
https://stackoverflow.com/questions/54763160/is-it-possible-that-bucketing-sink-create-bucket-on-event-time

默认的,watermark 和 (EventTime) Timestamp 信息会带到 BucketAssigner 实现所需的 Context
里,Flink 没有开箱即用的基于 EventTime 的分桶策略,你需要自己尝试实现。比水印晚的数据,可以自行实现为丢弃或追加到原有分区文件上。

对于 SQL 可以查看这份资料

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html

目前看起来开箱的逻辑迟到数据会追加而不是丢弃。有一些相关的配置可以调整 commit 也就是落盘的时机,但不影响落盘的数据。

Best,
tison.


amenhub  于2021年2月1日周一 上午11:07写道:

> StreamAPI使用的是StreamingFileSink,SQL就是FileSystem了
>
>
>
>
> 发件人: tison
> 发送时间: 2021-02-01 11:01
> 收件人: user-zh
> 主题: Re: Re: 水印的作用请教
> 请问你使用哪种 SinkConnector 写入 HDFS 呢?
>
> Best,
> tison.
>
>
> amenhub  于2021年2月1日周一 上午10:58写道:
>
> > >>>
> >
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
> > 另外还是比较想了解,当指定水印及延迟的时候,比水印晚的数据,在流写HDFS的时候会怎么处理?
> >
> >
> >
> >
> > 发件人: amenhub
> > 发送时间: 2021-02-01 10:44
> > 收件人: user-zh
> > 主题: Re: Re: 水印的作用请教
> > 谢谢回复!
> >
> > 也就是说如果我利用Flink从Kafka (Select
> > *)采集数据到HDFS,不涉及Timer触发逻辑,使用水印的目的就只是为了使用事件时间以及HDFS中基于事件时间进行分区目录创建,对吗?
> >
> > best,
> > amenhub
> >
> >
> >
> > 发件人: tison
> > 发送时间: 2021-02-01 10:36
> > 收件人: user-zh
> > 主题: Re: 水印的作用请教
> > 取决于你的计算流图,watermark 通常只在以下情况有实际作用
> > True
> > & cond 1. 使用 EventTime
> > & cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer
> > Best,
> > tison.
> > amenhub  于2021年2月1日周一 上午10:26写道:
> > > hi everyone,
> > >
> > > 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。
> > >
> > > 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。
> > > 那么,
> > > 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算?
> > >
> > >
> >
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
> > >
> > > best,
> > > amenhub
> > >
> > >
> > >
> > >
> >
>


Re: Re: 水印的作用请教

2021-01-31 Thread tison
请问你使用哪种 SinkConnector 写入 HDFS 呢?

Best,
tison.


amenhub  于2021年2月1日周一 上午10:58写道:

> >>>
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
> 另外还是比较想了解,当指定水印及延迟的时候,比水印晚的数据,在流写HDFS的时候会怎么处理?
>
>
>
>
> 发件人: amenhub
> 发送时间: 2021-02-01 10:44
> 收件人: user-zh
> 主题: Re: Re: 水印的作用请教
> 谢谢回复!
>
> 也就是说如果我利用Flink从Kafka (Select
> *)采集数据到HDFS,不涉及Timer触发逻辑,使用水印的目的就只是为了使用事件时间以及HDFS中基于事件时间进行分区目录创建,对吗?
>
> best,
> amenhub
>
>
>
> 发件人: tison
> 发送时间: 2021-02-01 10:36
> 收件人: user-zh
> 主题: Re: 水印的作用请教
> 取决于你的计算流图,watermark 通常只在以下情况有实际作用
> True
> & cond 1. 使用 EventTime
> & cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer
> Best,
> tison.
> amenhub  于2021年2月1日周一 上午10:26写道:
> > hi everyone,
> >
> > 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。
> >
> > 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。
> > 那么,
> > 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算?
> >
> >
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
> >
> > best,
> > amenhub
> >
> >
> >
> >
>


Re: 水印的作用请教

2021-01-31 Thread tison
取决于你的计算流图,watermark 通常只在以下情况有实际作用

True
& cond 1. 使用 EventTime
& cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer

Best,
tison.


amenhub  于2021年2月1日周一 上午10:26写道:

> hi everyone,
>
> 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。
>
> 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。
> 那么,
> 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算?
>
> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区?
>
> best,
> amenhub
>
>
>
>


Re: 为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了

2021-01-31 Thread tison
邮件列表不支持直接粘贴图片,请尝试使用附件或 gist 等方式共享。

Best,
tison.


hezongji...@qq.com  于2021年2月1日周一 上午9:28写道:

> 为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了
> 代码如下:
>
> 运行结果如下:
> --
> hezongji...@qq.com
>


Re: 用application mode部署应用,classloader.resolve-order参数是否必须要改为parent-first?

2021-01-26 Thread tison
又或者是反过来。

你可以尝试把 kafka connector 放到 /libs 里,自己应用打包里不带 kafka jar 也不带 kafka connector
jar,应该就可以不改配置。

Best,
tison.


tison  于2021年1月27日周三 下午2:47写道:

> 你是打包的时候自己打了 kafka 的依赖进去吗?看起来是应用里有一个 kafka 版本 A,接口
> org.apache.kafka.common.serialization.Serializer 用应用 classloader 加载,然后
> flink kafka connector 用集群 classloader 加载,继承自集群 classloader 里的
> org.apache.kafka.common.serialization.Serializer 导致这个问题。
>
> Best,
> tison.
>
>
> lp <973182...@qq.com> 于2021年1月27日周三 下午12:39写道:
>
>> 我写了一个 process
>>
>> function的demo,自定义source产生数据sink到kafka,然后发布到yarn集群运行,flink版本是1.11.2,采用application
>> Mode 部署,然后发现jobmanager-log报错: Failed to construct kafka producer;Caused
>> by:
>> org.apache.kafka.common.KafkaException: class
>> org.apache.kafka.common.serialization.ByteArraySerializer is not an
>> instance
>> of org.apache.kafka.common.serialization.Serializer。
>> 换了flink版本为1.12.1发现还是报这个错,后尝试采用per-job
>>
>> Mode部署发现是OK的。查资料发现是跟flink的类加载方式有关,即flink-conf.yml中的classloader.resolve-order参数,要将默认的
>>
>> child-first改成parent-first,修改后确实ok了,但是有个疑惑,为啥要改这个参数呢,看了官方文档,一般不建议改这个参数的,他避免了使用flink内置的类加载器,而是使用APP自己的。
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>


Re: 用application mode部署应用,classloader.resolve-order参数是否必须要改为parent-first?

2021-01-26 Thread tison
你是打包的时候自己打了 kafka 的依赖进去吗?看起来是应用里有一个 kafka 版本 A,接口
org.apache.kafka.common.serialization.Serializer 用应用 classloader 加载,然后
flink kafka connector 用集群 classloader 加载,继承自集群 classloader 里的
org.apache.kafka.common.serialization.Serializer 导致这个问题。

Best,
tison.


lp <973182...@qq.com> 于2021年1月27日周三 下午12:39写道:

> 我写了一个 process
>
> function的demo,自定义source产生数据sink到kafka,然后发布到yarn集群运行,flink版本是1.11.2,采用application
> Mode 部署,然后发现jobmanager-log报错: Failed to construct kafka producer;Caused by:
> org.apache.kafka.common.KafkaException: class
> org.apache.kafka.common.serialization.ByteArraySerializer is not an
> instance
> of org.apache.kafka.common.serialization.Serializer。
> 换了flink版本为1.12.1发现还是报这个错,后尝试采用per-job
>
> Mode部署发现是OK的。查资料发现是跟flink的类加载方式有关,即flink-conf.yml中的classloader.resolve-order参数,要将默认的
>
> child-first改成parent-first,修改后确实ok了,但是有个疑惑,为啥要改这个参数呢,看了官方文档,一般不建议改这个参数的,他避免了使用flink内置的类加载器,而是使用APP自己的。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 请教个Flink checkpoint的问题

2021-01-14 Thread tison
没明白你说的最近一次 checkpoint 被删除啥意思,你可以列一下 checkpoint 目录的内容,你觉得应该是啥,结果是啥。

Best,
tison.


Evan  于2021年1月14日周四 下午5:56写道:

> 代码图挂掉了,看不到代码
>
>
>
>
> 发件人: yinghua...@163.com
> 发送时间: 2021-01-14 17:26
> 收件人: user-zh
> 主题: 请教个Flink checkpoint的问题
>
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
>
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
>
>
> yinghua...@163.com
>


Re: flink 编译

2021-01-12 Thread tison
试试 mvn clean install -DskipTests -pl flink-runtime,flink-dist

Best,
tison.


penguin.  于2021年1月12日周二 下午9:44写道:

> Hi,
>
>
> 请问有人知道怎么单独编译flink-runtime模块吗?
> 然后这样是否能把更改的部分直接在flink-dist包中的org.apache.flink.runtime目录下进行替换?
> 整体编译一次实在太慢了。
> 谢谢!
>
>
> penguin


Re: flink算子类在多个subtask中是各自初始化1个实例对象吗?

2020-11-15 Thread tison
可以这么认为,大体上你可以认为每个并发有自己的环境。

技术上,算子对象是每个并发会实例化一个,而 static 变量的【共享】程度跟你设置的 slot per TM
值还有其他一些调度相关的参数有关,但是最好不要依赖这种实现层面的东西。

一种常见的误解是我创建一个 static HashMap 就神奇地拥有了全局的键值存储,这当然是不对的,只有在同一个 JVM 实例上也就是同一个 TM
上的任务才会看到同一个 HashMap 对象,而这几乎是不可控的。

可以看一下这篇文档[1]对物理部署的实际情况有一个基本的认知。

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html


hl9...@126.com  于2020年11月16日周一 下午1:55写道:

> Hi,all:
>
> flink算子在多个并行度的job中,每个算子(假如1个算子都有一个对应的java类)在多个subtask中会共享1个java类实例吗?还是每个subtask都会各自的实例?
>
> 我做了一个简单测试,写了一个flatmap类,在构造方法和open方法中打印类的toString方法,发现输出都不同,是否可以证明每个subtask都初始化了自己的类实例?
> 希望有朋友能解释下算子在job运行中初始化的过程。
>
> 测试相关代码如下:
> // flink 1.10.2版本,并行度为3
> @Slf4j
> public class PersonFlatMap extends RichFlatMapFunction String>, Person> {
> private transient ValueState state;
>
> public PersonFlatMap(){
> log.info(String.format("PersonFlatMap【%s】:
> 创建实例",this.toString()));
> }
>
> @Override
> public void open(Configuration parameters) throws IOException {
> //略去无关代码...
> log.info(String.format("PersonFlatMap【%s】:初始化状态!",
> this.toString()));
> }
>
> @Override
> public void flatMap(Tuple2 t, Collector
> collector) throws Exception {
> Person p = JSONUtil.toObject(t.f1,Person.class);
> collector.collect(p);
> if(state.value() == null){state.update(0);}
> state.update(state.value() + 1);
> log.info("state: "+state.value());
> }
> }
>
> //测试日志输出
> ...
> flink-10.2 - [2020-11-16 13:41:54.360] - INFO  [main]
> com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@ba8d91c】: 创建实例
> //此处略去无关日志...
> flink-10.2 - [2020-11-16 13:42:00.326] - INFO  [Flat Map -> Sink: Print to
> Std. Out (1/3)] org.apache.flink.runtime.state.heap.HeapKeyedStateBackend
> - Initializing heap keyed state backend with stream factory.
> flink-10.2 - [2020-11-16 13:42:00.351] - INFO  [Flat Map -> Sink: Print to
> Std. Out (1/3)] com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c9d895d】:初始化状态!
> flink-10.2 - [2020-11-16 13:42:00.354] - INFO  [Flat Map -> Sink: Print to
> Std. Out (3/3)] com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c489c40】:初始化状态!
> flink-10.2 - [2020-11-16 13:42:00.356] - INFO  [Flat Map -> Sink: Print to
> Std. Out (2/3)] com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@1cd4f5c9】:初始化状态!
> ...
>
>
>
>
> hl9...@126.com
>


Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread tison
detached 是另一个坑,因为你 attached 的时候需要等 client 去 request status 才会触发状态变化,但是普通的
execute 应该也是会自动的去拉结果的。

可以看下下列关键日志的打印情况

- log.info("Job {} reached globally terminal state {}.", ...)
- LOG.debug("Shutting down cluster because someone retrieved the job
result.");
- LOG.info("Shutting {} down with application status {}. Diagnostics {}.",
...)

Best,
tison.


JasonLee <17610775...@163.com> 于2020年11月13日周五 上午11:22写道:

> hi
> 1,首先确定你提交的是per-job模式吗?
> 2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了?
>
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink与Yarn的状态一致性问题

2020-11-12 Thread tison
PerJob 模式下,在作业完全挂掉之后,是会上报 YARN RM 应用自己失败的状态的。

当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。

你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?

Best,
tison.


zhisheng  于2020年11月12日周四 下午8:17写道:

> 同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态
>
> hdxg1101300123  于2020年11月12日周四 下午8:07写道:
>
> > 可以设置检查点失败任务也失败
> >
> >
> >
> > 发自vivo智能手机
> > > hi everyone,
> > >
> > > 最近在使用Flink-1.11.1 On Yarn Per
> > Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn
> > application仍处于运行状态
> > >
> > > 疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢
> > >
> > > best,
> > > amenhub
>


Re: 订阅

2020-10-08 Thread tison
Please send email with any content to -subscr...@flink.apache.org
for subscription.

For example, mailto:user-zh-subscr...@flink.apache.org to subscribe
user-zh@flink.apache.org

Best,
tison.


葛春法-18667112979  于2020年10月8日周四 下午8:45写道:

> I want to subscribe flink mail.


Re: 订阅

2020-10-08 Thread tison
Please send email with any content to -subscr...@flink.apache.org
for subscription.

For example, mailto:user-zh-subscr...@flink.apache.org to subscribe
user...@flink.apache.org

Best,
tison.


葛春法-18667112979  于2020年10月8日周四 下午8:45写道:

> I want to subscribe flink mail.


Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-30 Thread tison
故障点的意思是从开始跌的地方重新消费吗?如果是这样那就是有问题,可以看看之前输出变少是正确数据输出慢了还是有些没输出了,慢了就得看看当时的环境,应该还是会有什么网络或者负载有波动的,没有可能就要怀疑监控系统有问题了;少输出了就是错了,可能是依赖的外部环境不稳定等等。

Best,
tison.


tison  于2020年9月30日周三 下午5:33写道:

> 那有审计/监控的话看下每个节点的 in/out 记录呗,总能看到是哪一步跌了...
>
> 照你现在提供的信息听起来一切正常那就是业务逻辑本身输出少了,不然总得有哪里不一样。如果只有 sink 跌了,那就是 sink
> 有问题,比如可能依赖了外部环境或者内部积累错误等等。
>
> Best,
> tison.
>
>
> Yang Peng  于2020年9月30日周三 下午5:26写道:
>
>> 感谢回复,是的,之前确实怀疑是业务逻辑导致的
>> 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题
>>
>> tison  于2020年9月30日周三 下午2:33写道:
>>
>> > Hi Yang,
>> >
>> > 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么?
>> >
>> > 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。
>> >
>> > Best,
>> > tison.
>> >
>> >
>> > Yang Peng  于2020年9月30日周三 上午10:29写道:
>> >
>> > > 感谢回复,我们看了consumer的lag很小
>> > > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
>> > > 而且任务重启了没法jstack判断了
>> > >
>> > > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:
>> > >
>> > > >
>> > > >
>> > > >
>> > > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
>> > > > 也可以 jstack 采下堆栈看下,GC等看下。
>> > > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
>> > > > Best,
>> > > > Hailong Wang
>> > > > 在 2020-09-29 20:06:50,"Yang Peng"  写道:
>> > > >
>> > > >
>> > >
>> >
>> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
>> > > > >flinkkafkaconsumer消费的并行度也是90
>> 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
>> > > > >
>> > > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
>> > > > >
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >> Hi Yang Peng:
>> > > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
>> > > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
>> > > > >> 2. Source 的序列化耗时严重,导致拉取变慢。
>> > > > >> 可以尝试着扩kafka 分区,加大Source并发看下。
>> > > > >> Best,
>> > > > >> Hailong Wang
>> > > > >>
>> > > > >> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
>> > > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
>> > > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
>> > > > >> >kafka消费没有积压,也没有反压,
>> > > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
>> > > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
>> > > > >>
>> > > >
>> > >
>> >
>>
>


Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-30 Thread tison
那有审计/监控的话看下每个节点的 in/out 记录呗,总能看到是哪一步跌了...

照你现在提供的信息听起来一切正常那就是业务逻辑本身输出少了,不然总得有哪里不一样。如果只有 sink 跌了,那就是 sink
有问题,比如可能依赖了外部环境或者内部积累错误等等。

Best,
tison.


Yang Peng  于2020年9月30日周三 下午5:26写道:

> 感谢回复,是的,之前确实怀疑是业务逻辑导致的
> 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题
>
> tison  于2020年9月30日周三 下午2:33写道:
>
> > Hi Yang,
> >
> > 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么?
> >
> > 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。
> >
> > Best,
> > tison.
> >
> >
> > Yang Peng  于2020年9月30日周三 上午10:29写道:
> >
> > > 感谢回复,我们看了consumer的lag很小
> > > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
> > > 而且任务重启了没法jstack判断了
> > >
> > > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:
> > >
> > > >
> > > >
> > > >
> > > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
> > > > 也可以 jstack 采下堆栈看下,GC等看下。
> > > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
> > > > Best,
> > > > Hailong Wang
> > > > 在 2020-09-29 20:06:50,"Yang Peng"  写道:
> > > >
> > > >
> > >
> >
> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
> > > > >flinkkafkaconsumer消费的并行度也是90
> 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
> > > > >
> > > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
> > > > >
> > > > >>
> > > > >>
> > > > >>
> > > > >> Hi Yang Peng:
> > > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> > > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
> > > > >> 2. Source 的序列化耗时严重,导致拉取变慢。
> > > > >> 可以尝试着扩kafka 分区,加大Source并发看下。
> > > > >> Best,
> > > > >> Hailong Wang
> > > > >>
> > > > >> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
> > > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
> > > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
> > > > >> >kafka消费没有积压,也没有反压,
> > > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
> > > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
> > > > >>
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

2020-09-30 Thread tison
Thanks for your explanation. It would be fine if only checking leadership &
actually write information is atomic.

Best,
tison.


Yang Wang  于2020年9月30日周三 下午3:57写道:

> Thanks till and tison for your comments.
>
> @Till Rohrmann 
> 1. I am afraid we could not do this if we are going to use fabric8
> Kubernetes client SDK for the leader election. The official Kubernetes Java
> client[1] also could not support it. Unless we implement a new
> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
> that we could gain too much from this.
>
> 2. Yes, the implementation will be a little complicated if we want to
> completely eliminate the residual job graphs or checkpoints. Inspired by
> your suggestion, another different solution has come into my mind. We could
> use a same ConfigMap storing the JobManager leader, job graph,
> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
> the HA meta storage. Then it will be easier to guarantee that only the
> leader could write the ConfigMap in a transactional operation. Since
> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
> transactional operation.
>
> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However,
> we still have the chances that two JobManager are running and trying to
> get/delete a key in the same ConfigMap concurrently. Imagine that the
> kubelet(like NodeManager in YARN) is down, and then the JobManager could
> not be deleted. A new JobManager pod will be launched. We are just in the
> similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit
> is we do not need to implement a leader election/retrieval service.
>
> @tison
> Actually, I do not think we will have such issue in the Kubernetes HA
> service. In the Kubernetes LeaderElector[2], we have the leader information
> stored on the annotation of leader ConfigMap. So it would not happen the
> old leader could wrongly override the leader information. Once a JobManager
> want to write his leader information to the ConfigMap, it will check
> whether it is the leader now. If not, anything will happen. Moreover, the
> Kubernetes Resource Version[3] ensures that no one else has snuck in and
> written a different update while the client was in the process of
> performing its update.
>
>
> [1].
> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
> [2].
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
> [3].
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>
>
> Best,
> Yang
>
> tison  于2020年9月30日周三 下午3:21写道:
>
>> Hi,
>>
>> Generally +1 for a native k8s HA service.
>>
>> For leader election & publish leader information, there was a
>> discussion[1]
>> pointed out that since these two actions is NOT atomic, there will be
>> always
>> edge case where a previous leader overwrite leader information, even with
>> versioned write. Versioned write helps on read again if version mismatches
>> so if we want version write works, information in the kv pair should help
>> the
>> contender reflects whether it is the current leader.
>>
>> The idea of writes leader information on contender node or something
>> equivalent makes sense but the details depends on how it is implemented.
>> General problems are that
>>
>> 1. TM might be a bit late before it updated correct leader information
>> but
>> only if the leader election process is short and leadership is stable at
>> most
>> time, it won't be a serious issue.
>> 2. The process TM extract leader information might be a bit more complex
>> than directly watching a fixed key.
>>
>> Atomic issue can be addressed if one leverages low APIs such as lease &
>> txn
>> but it causes more developing efforts. ConfigMap and encapsulated
>> interface,
>> thought, provides only a self-consistent mechanism which doesn't promise
>> more consistency for extension.
>>
>> Best,
>> tison.
>>
>> [1]
>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>
>>
>>
>>

Re: flink任务yarn perjob 提交任务如何设置job name

2020-09-30 Thread tison
代码里 env.execute("你的作业名")

Best,
tison.


丁浩浩 <18579099...@163.com> 于2020年9月30日周三 下午3:44写道:

> 如题,我需要设置flink提交到yarn的job name应该怎么设置呢?


Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

2020-09-30 Thread tison
Hi,

Generally +1 for a native k8s HA service.

For leader election & publish leader information, there was a discussion[1]
pointed out that since these two actions is NOT atomic, there will be always
edge case where a previous leader overwrite leader information, even with
versioned write. Versioned write helps on read again if version mismatches
so if we want version write works, information in the kv pair should help
the
contender reflects whether it is the current leader.

The idea of writes leader information on contender node or something
equivalent makes sense but the details depends on how it is implemented.
General problems are that

1. TM might be a bit late before it updated correct leader information but
only if the leader election process is short and leadership is stable at
most
time, it won't be a serious issue.
2. The process TM extract leader information might be a bit more complex
than directly watching a fixed key.

Atomic issue can be addressed if one leverages low APIs such as lease & txn
but it causes more developing efforts. ConfigMap and encapsulated interface,
thought, provides only a self-consistent mechanism which doesn't promise
more consistency for extension.

Best,
tison.

[1]
https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E



Till Rohrmann  于2020年9月29日周二 下午9:25写道:

> For 1. I was wondering whether we can't write the leader connection
> information directly when trying to obtain the leadership (trying to update
> the leader key with one's own value)? This might be a little detail,
> though.
>
> 2. Alright, so we are having a similar mechanism as we have in ZooKeeper
> with the ephemeral lock nodes. I guess that this complicates the
> implementation a bit, unfortunately.
>
> 3. Wouldn't the StatefulSet solution also work without a PV? One could
> configure a different persistent storage like HDFS or S3 for storing the
> checkpoints and job blobs like in the ZooKeeper case. The current benefit I
> see is that we avoid having to implement this multi locking mechanism in
> the ConfigMaps using the annotations because we can be sure that there is
> only a single leader at a time if I understood the guarantees of K8s
> correctly.
>
> Cheers,
> Till
>
> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang  wrote:
>
> > Hi Till, thanks for your valuable feedback.
> >
> > 1. Yes, leader election and storing leader information will use a same
> > ConfigMap. When a contender successfully performs a versioned annotation
> > update operation to the ConfigMap, it means that it has been elected as
> the
> > leader. And it will write the leader information in the callback of
> leader
> > elector[1]. The Kubernetes resource version will help us to avoid the
> > leader ConfigMap is wrongly updated.
> >
> > 2. The lock and release is really a valid concern. Actually in current
> > design, we could not guarantee that the node who tries to write his
> > ownership is the real leader. Who writes later, who is the owner. To
> > address this issue, we need to store all the owners of the key. Only when
> > the owner is empty, the specific key(means a checkpoint or job graph)
> could
> > be deleted. However, we may have a residual checkpoint or job graph when
> > the old JobManager crashed exceptionally and do not release the lock. To
> > solve this problem completely, we need a timestamp renew mechanism
> > for CompletedCheckpointStore and JobGraphStore, which could help us to
> the
> > check the JobManager timeout and then clean up the residual keys.
> >
> > 3. Frankly speaking, I am not against with this solution. However, in my
> > opinion, it is more like a temporary proposal. We could use StatefulSet
> to
> > avoid leader election and leader retrieval. But I am not sure whether
> > TaskManager could properly handle the situation that same hostname with
> > different IPs, because the JobManager failed and relaunched. Also we may
> > still have two JobManagers running in some corner cases(e.g. kubelet is
> > down but the pod is running). Another concern is we have a strong
> > dependency on the PersistentVolume(aka PV) in FileSystemHAService. But it
> > is not always true especially in self-build Kubernetes cluster. Moreover,
> > PV provider should guarantee that each PV could only be mounted once.
> Since
> > the native HA proposal could cover all the functionality of StatefulSet
> > proposal, that's why I prefer the former.
> >
> >
> > [1].
> >
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/le

Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-30 Thread tison
Hi Yang,

你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么?

如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。

Best,
tison.


Yang Peng  于2020年9月30日周三 上午10:29写道:

> 感谢回复,我们看了consumer的lag很小
> 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
> 而且任务重启了没法jstack判断了
>
> hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:
>
> >
> >
> >
> > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
> > 也可以 jstack 采下堆栈看下,GC等看下。
> > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
> > Best,
> > Hailong Wang
> > 在 2020-09-29 20:06:50,"Yang Peng"  写道:
> >
> >
> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
> > >flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
> > >
> > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
> > >
> > >>
> > >>
> > >>
> > >> Hi Yang Peng:
> > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
> > >> 2. Source 的序列化耗时严重,导致拉取变慢。
> > >> 可以尝试着扩kafka 分区,加大Source并发看下。
> > >> Best,
> > >> Hailong Wang
> > >>
> > >> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
> > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
> > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
> > >> >kafka消费没有积压,也没有反压,
> > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
> > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
> > >>
> >
>


Re: 请教二阶段提交问题

2020-09-26 Thread tison
> 可是再次提交没有意义啊,没有数据[捂脸哭]

这个事儿是这样的,你用 checkpoint 之后呢没有反过来确认的 commit 会留在 state 里,所以重启的时候重新加载 state
的时候就会再提交一遍。然后向 kafka 这一类存储 commit offset 是幂等的,发现已经 commit 过就跳过就 OK 了。

Best,
tison.


蒋佳成(Jiacheng Jiang) <920334...@qq.com> 于2020年9月26日周六 下午4:01写道:

>
> 两阶段提交的第一阶段提交中,事务参与者反馈ok后需要作出之后一定能提交事务的承诺,事务参与者需要做些事来兑现承诺比如将事务操作持久化。在FlinkKafkaProducer中,preCommit就是调用了KafkaProducer的flush将数据刷到kafka中,在整个checkpoint完成后再提交事务,如果提交失败,会在job重启时再次提交事务。因此,我们需要保证的是preCommit成功后commit一定要能成功,这个需要根据具体写入的存储提供的特性来完成。
>
>
>
>
> -- 原始邮件 --
> 发件人: "高亮" 发送时间: 2020年9月25日(星期五) 中午11:14
> 收件人: "user-zh" 主题: 请教二阶段提交问题
>
>
>
> 各位大佬,请教一下二阶段提交的问题,preCommit预提交失败回滚已经很清楚了,就是在commit阶段提交如果失败会怎么,比较迷惑。
>
>
>
> 我自己测试了一下,发现只要是commit失败会造成数据丢失,但是看了下方法注释,说是失败了后会重启flink恢复到最近的state,继续提交,可是我在程序里有专门打印source输入的流数据,发现没有按到任何数据进入,也就是说flink重启后就直接调用commit再次提交。
>
>
> 可是再次提交没有意义啊,没有数据[捂脸哭]
>
>
> 所以请教大佬,当commit出现异常后,flink内部是如何解决的,作为flink应用者,如何正确使用避免和解决这类问题!


Re: 编译Flink时找不到scala-maven-plugin:3.1.4

2020-09-23 Thread tison
从日志看你的 scala 是 2.10 版本的,比较新版本的 flink 应该都只支持 2.11 和 2.12

Best,
tison.


Natasha <13631230...@163.com> 于2020年9月23日周三 下午4:00写道:

> Hi All,
> 很高兴加入Flink这个大家庭!但是有个问题困扰了我好久!
> 当我导入Flink到IDEA中准备进行编译,输入“mvn clean install -Drat.skip=true
> -Dmaven.test.skip=true -Dmaven.javadoc.skip=true -Dcheckstyle.skip=true”后,
> 报错“Failed to execute goal
> net.alchim31.maven:scala-maven-plugin:3.1.4:testCompile
> (scala-test-compile) on project flink-runtime_2.10”,
>
> 我尝试了网上的方法,修改pom.xml文件中scala-maven-plugin的版本,或者是让IDEA的Scala版本与Windows的Scala版本保持一致,但是都不起作用!
>
>
>
>
> Best,
> Natasha
>
>
> | |
> Natasha
> |
> |
> |
> 签名由网易邮箱大师定制


Re: 一个main方法启动2个yarn job问题

2020-08-28 Thread tison
应该说 SQL 的 update 会在底层也 call 一次 env.execute

如果你配的是所谓的 detach 模式,是有这种可能的。这个是实现问题,你可以先贴一下代码,然后描述你要的行为,看下可以怎么写

Best,
tison.


Rui Li  于2020年8月28日周五 下午9:59写道:

> 作业代码是怎么写的啊?按说写SQL的话不需要执行Env.execute
>
> On Fri, Aug 28, 2020 at 9:41 AM air23  wrote:
>
> > 你好。我有一个接kafka 写入tidb的任务 为什么会启动2个yarn任务去运行呢?
> > 我是先用datastream 接入kafka。然后转成table sql写入到tidb
> > 2个job name 一个叫Env.execute配置的名字
> > 一个是叫insert 写入tidb的sql语句名字
> >
> >
>
> --
> Best regards!
> Rui Li
>


Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-12 Thread tison
那就要看下你是什么 Flink 版本,怎么提交到 YARN 上的,以及 YARN 的日志上的 classpath 是啥了

Best,
tison.


王松  于2020年7月13日周一 下午12:54写道:

> 各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
> 请问是什么原因导致的呢?
>
> 代码如下:
>
>
> -
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env,
> settings);
>
> tenv.executeSql("CREATE TABLE test_table (\n" +
> " id INT,\n" +
> " name STRING,\n" +
> " age INT,\n" +
> " create_at TIMESTAMP(3)\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'test_json',\n" +
> " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
> " 'properties.group.id' = 'testGroup',\n" +
> " 'format' = 'json',\n" +
> " 'scan.startup.mode' = 'latest-offset'\n" +
> ")");
> Table table = tenv.sqlQuery("select * from test_table");
> tenv.toRetractStream(table, Row.class).print();
> env.execute("flink 1.11.0 demo");
>
> -
>
> pom 文件如下:
> =
> 
> 2.11
> 1.11.0
> 
> 
> 
> org.apache.flink
>
> flink-table-planner-blink_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
>
> flink-table-runtime-blink_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
> 
> org.apache.flink
>
> flink-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
>
> flink-connector-kafka-0.11_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
>
> flink-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-core
> ${flink.version}
> 
> 
> org.apache.flink
> flink-clients_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-table-common
> ${flink.version}
> 
> 
> =
>


Re: Flink DataStream 统计UV问题

2020-07-09 Thread tison
你这个需求貌似是要看一天的 UV 的实时更新量,可以看一下 sliding window。如果是每天 0 点清零,实时看今天的
UV,那就是另一个问题了,应该需要自己定义 trigger & evictor

每条触发一次 window...看你数据量吧

Best,
tison.


shizk233  于2020年7月10日周五 上午10:23写道:

> Hi Jiazhi,
>
>
> 1.如果数据流量不是很大的话,按每条数据触发也没问题。另外,基于事件时间的情况,提前触发可以选择ContinuousEventTimeTrigger,可以查看Trigger接口的实现找到你想要的trigger。
> 2.窗口结束后会自动释放。一般对于Global窗口需要手动设置TTL
>
> Best,
> shizk233
>
> ゞ野蠻遊戲χ  于2020年7月7日周二 下午10:27写道:
>
> > 大家好!
> >
> >  想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题:
> > 1、在使用Tumbling窗口的时候,由于使用窗口跨度是1天(Time.days(1)),只有以一天结束的时候,才能输出一个uv值,
> > 这样时间等待太长了,所以加了一个trigger,每来一条都触发一次窗口,不知道这样的用法没有问题。
> > 2、还有想问下在窗口结束后,里面的state状态会自动释放吗?还是要自己手动设置TTL的。
> >
> >
> > 谢谢!
> > Jiazhi
> >
>


Re: flink 高可用问题

2020-06-22 Thread tison
你看一下你的 chk 间隔,看起来是作业还没调度起来就开始 chk 所以失败。可能原因资源不足,调度不起来或者调度得慢,你 chk 间隔又小,就这样了。

如果是一直 chk 以这个方式失败,应该看下调度的日志为啥迟迟调不起来

Best,
tison.


Yichao Yang <1048262...@qq.com> 于2020年6月22日周一 上午10:57写道:

> Hi
>
>
> 看日志应该只是INFO,而不是错误,你的job是做不了checkpoint吗?
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"Tony" 发送时间:2020年6月22日(星期一) 上午10:54
> 收件人:"user-zh"
> 主题:flink 高可用问题
>
>
>
> 你好。
>
>
> 我按着官方文档配置了flink的高可用(flink-conf.yaml)如下:
> high-availability:zookeeper
> high-availability.zookeeper.quorum:master:2181 ,slave1:2181,slave2:2181
> high-availability.zookeeper.path.root:/flink
> high-availability.cluster-id:/cluster_one
> highavailability.storageDir:hdfs://master:9000/flink/ha
>
>
> 我的flink和zookeeper都是在K8s的容器中
> job启动出现如下问题:麻烦帮忙看一下,谢谢。
> 2020-06-22 02:47:43,884 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Checkpoint triggering task Source:Kafka-Consumer - (Sink: Print to
> Std. Out, Filter -Query Map - Unwind - Custom Map - filter
> - Data Transformation - Filter) (1/1) of job
>  is not in state RUNNING but SCHEDULED
> instead. Aborting checkpoint.


Re: flink精准一次性消费问题

2020-06-11 Thread tison
>checkpoint的配置有什么要求吗?

配成 EXACTLY_ONCE

>还有就是kafka的事务提交多久能提交一次,可配置吗?

chk 的时候提交,这里面深究的话有点并发问题,可以看 TwoPhaseCommitSink 的细节
配置这个事儿...有能力自定义,但是为啥要这么做呢呢

Best,
tison.


蒋佳成(Jiacheng Jiang) <920334...@qq.com> 于2020年6月11日周四 下午4:59写道:

> checkpoint完成通知里提交的事务
>
>
>
>
> --原始邮件--
> 发件人: "胡云川" 发送时间: 2020年6月11日(星期四) 下午4:56
> 收件人: "user-zh" 主题: 回复:flink精准一次性消费问题
>
>
>
> gt;Hi
> gt;这些问题都已经排查过了,
> gt;有一个问题,在做exctly-once的时候,
> gt;checkpoint的配置有什么要求吗?
> gt;还有就是kafka的事务提交多久能提交一次,可配置吗?
> gt;望解答,谢谢各位!
>
>
>
>
>
>
> --nbsp;原始邮件nbsp;--
> 发件人:nbsp;"Matt Wang" 发送时间:nbsp;2020年6月10日(星期三) 晚上7:39
> 收件人:nbsp;"user-zh@flink.apache.org"
> 主题:nbsp;Re:flink精准一次性消费问题
>
>
>
> kafka 从 0.11.0 开始支持事务写,在 flink 中如果开启了 EXACTLY-ONCE,数据会先 send 到 kafka,但在未调用
> commit 之前,这部分数据是数据是属于未完成事务的数据,站在 kafka
> 的角度,数据还是会存储下来的,只不过下游在消费的时候,根据nbsp; isolation.level 设置来决定是否能消费到未 commit
> 的数据。
>
>
> ---
> Best,
> Matt Wang
>
>
> On 06/10/2020 14:28,Yichao Yang<1048262...@qq.comgt; wrote:
> Hi
>
>
> sinkamp;nbsp;
> 为kafka时,需要kafka版本大于1.0,并且kafka端也要开启两阶段提交功能才能满足EXACTLY-ONCE。可以检查下你的配置是否都满足。
>
>
> Best,
> Yichao Yang
>
>
>
>
> --amp;nbsp;原始邮件amp;nbsp;--
> 发件人:amp;nbsp;"胡云川" 发送时间:amp;nbsp;2020年6月10日(星期三) 下午2:25
> 收件人:amp;nbsp;"user-zh"
> 主题:amp;nbsp;flink精准一次性消费问题
>
>
>
> amp;amp;gt;Hi,
> amp;amp;gt;在使用flink往kafka写入数据时,使用了EXACTLY-ONCE,但是
>
> amp;amp;gt;在debug测试的时候,发现数据在invoke方法里的traction.producer.send()的时候数据就已经传过去了,没有通过precommit和commit方法
> amp;amp;gt;请问大家可以解释一下吗?谢谢!


Re: flink JobManager HA 异常 the fencing token is null

2020-06-09 Thread tison
噢,那应该就是上面说的问题了

你的 Dispatcher 能被发现说明一开始选主和发布是 ok 的,你可以贴一下 HA
的配置,看看有没特别不靠谱的,然后去日志里找一下丢 leadership 的日志,一般来说前后会有一堆 zk 链接 ConnectionLoss 或者
SessionExpire 的日志

Best,
tison.


whirly  于2020年6月9日周二 下午9:23写道:

> Flink 1.8
>
>
>
>
> | |
> whirly
> |
> |
> 邮箱:whir...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年06月09日 21:15,tison 写道:
> 啥 flink 版本啊?1.10 Dispatcher 魔改之后应该不会 null 的
>
> Best,
> tison.
>
>
> whirly  于2020年6月9日周二 下午8:58写道:
>
> > 大家好:
> > 环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?
> >
> >
> > 异常信息:
> > Internal server error.,
> >  > side:\norg.apache.flink.runtime.rpc.exceptions.FencingTokenException:
> >  Fencing token not set: Ignoring message
> > LocalFencedMessage(93428232915e7c6e4947aaa5910341a8,
> > LocalRpcInvocation(requestMultipleJobDetails(Time)))
> > sent to akka.tcp://flink@cluster2-host1:40443/user/dispatcher because
> the
> > fencing token is null.
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> > at
> >
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> > at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > End of exception on server side>
>


Re: flink JobManager HA 异常 the fencing token is null

2020-06-09 Thread tison
你可以详细说一下场景,这个我想了一下应该是你选举窗口太长了

0. 某个时候,Dispatcher 选出了 Leader 并发布自己的地址
1. 某个组件向 Dispatcher 发了个消息,你这里前端点击之后后端 WebMonitor 给 Dispatcher 发
requestMultipleJobDetails
消息
2. Dispatcher 跟 zk 链接抖动,丢 leader 了。早期版本会把这个 fencing token 设置成 null
3. 1 里面的消息到达 Dispatcher,Dispatcher 走 fencing token 逻辑,看到是 null
4. 抛出此异常

如果稍后又选举成功,这里的异常应该是 fencing token mismatch 一类的

Best,
tison.


tison  于2020年6月9日周二 下午9:15写道:

> 啥 flink 版本啊?1.10 Dispatcher 魔改之后应该不会 null 的
>
> Best,
> tison.
>
>
> whirly  于2020年6月9日周二 下午8:58写道:
>
>> 大家好:
>> 环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?
>>
>>
>> 异常信息:
>> Internal server error.,
>> > side:\norg.apache.flink.runtime.rpc.exceptions.FencingTokenException:
>>  Fencing token not set: Ignoring message
>> LocalFencedMessage(93428232915e7c6e4947aaa5910341a8,
>> LocalRpcInvocation(requestMultipleJobDetails(Time)))
>> sent to akka.tcp://flink@cluster2-host1:40443/user/dispatcher because
>> the fencing token is null.
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> End of exception on server side>
>
>


Re: flink JobManager HA 异常 the fencing token is null

2020-06-09 Thread tison
啥 flink 版本啊?1.10 Dispatcher 魔改之后应该不会 null 的

Best,
tison.


whirly  于2020年6月9日周二 下午8:58写道:

> 大家好:
> 环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?
>
>
> 异常信息:
> Internal server error.,
>  side:\norg.apache.flink.runtime.rpc.exceptions.FencingTokenException:
>  Fencing token not set: Ignoring message
> LocalFencedMessage(93428232915e7c6e4947aaa5910341a8,
> LocalRpcInvocation(requestMultipleJobDetails(Time)))
> sent to akka.tcp://flink@cluster2-host1:40443/user/dispatcher because the
> fencing token is null.
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> End of exception on server side>


Re: 订阅中文邮件列表

2020-06-02 Thread tison
请发送任意邮件到 user-zh-subscr...@flink.apache.org 订阅。

Best,
tison.


li wei  于2020年6月2日周二 下午7:36写道:

> 中文邮件列表
>


Re: Kafka Consumer反序列化错问题

2020-05-29 Thread tison
另外关于类加载的一般性文档,可以看下这个

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html

Best,
tison.


tison  于2020年5月29日周五 下午7:46写道:

> 这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。
>
> 参考这个文档
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath
>
> Best,
> tison.
>
>
> Even <452232...@qq.com> 于2020年5月29日周五 下午6:48写道:
>
>> 谢谢,请问需要怎么处理避免这个问题?
>>
>>
>>
>>
>> --原始邮件--
>> 发件人:"zz zhang"> 发送时间:2020年5月29日(星期五) 下午5:16
>> 收件人:"user-zh"> jkill...@dingtalk.com;
>>
>> 主题:Re: Kafka Consumer反序列化错问题
>>
>>
>>
>> 应该是maven-shade那边配置问题,
>>
>> 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.
>> apache.flink.kafka.shaded.org
>> .apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常
>>
>> 夏帅 > 
>>  可以排除一下是否是jar包冲突
>> 
>> 
>>  --
>>  发件人:Even <452232...@qq.com
>>  发送时间:2020年5月29日(星期五) 16:17
>>  收件人:user-zh >  主 题:Kafka Consumer反序列化错问题
>> 
>>  Hi!
>>  请教一个Kafka Consumer反序列问题:
>>  一个kafkanbsp;consumernbsp;job 提交到Flink session
>> cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下:
>>  其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data =
>> env.addSource(new FlinkKafkaConsumer[String](topic, new
>> SimpleStringSchema(), properties))
>>  2020-05-27nbsp;17:05:22
>>  org.apache.kafka.common.KafkaException: Failed to construct kafka
>> consumer
>>  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.>  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.>  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.>  at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>>  at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>  at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>>  at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>  at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>>  at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>  at java.lang.Thread.run(Thread.java:748)
>>  Caused by: org.apache.kafka.common.KafkaException:
>> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
>> is not an instance of org.apache.kafka.common.serialization.Deserializer
>>  at
>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
>>  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.>  ... 15 more
>>
>>
>>
>> --
>> Best,
>> zz zhang
>
>


Re: Kafka Consumer反序列化错问题

2020-05-29 Thread tison
这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。

参考这个文档
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath

Best,
tison.


Even <452232...@qq.com> 于2020年5月29日周五 下午6:48写道:

> 谢谢,请问需要怎么处理避免这个问题?
>
>
>
>
> --原始邮件--
> 发件人:"zz zhang" 发送时间:2020年5月29日(星期五) 下午5:16
> 收件人:"user-zh" jkill...@dingtalk.com;
>
> 主题:Re: Kafka Consumer反序列化错问题
>
>
>
> 应该是maven-shade那边配置问题,
>
> 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.
> apache.flink.kafka.shaded.org
> .apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常
>
> 夏帅  
>  可以排除一下是否是jar包冲突
> 
> 
>  --
>  发件人:Even <452232...@qq.com
>  发送时间:2020年5月29日(星期五) 16:17
>  收件人:user-zh   主 题:Kafka Consumer反序列化错问题
> 
>  Hi!
>  请教一个Kafka Consumer反序列问题:
>  一个kafkanbsp;consumernbsp;job 提交到Flink session
> cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下:
>  其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data =
> env.addSource(new FlinkKafkaConsumer[String](topic, new
> SimpleStringSchema(), properties))
>  2020-05-27nbsp;17:05:22
>  org.apache.kafka.common.KafkaException: Failed to construct kafka
> consumer
>  at
> org.apache.kafka.clients.consumer.KafkaConsumer.  at
> org.apache.kafka.clients.consumer.KafkaConsumer.  at
> org.apache.kafka.clients.consumer.KafkaConsumer.  at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>  at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>  at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>  at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>  at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: org.apache.kafka.common.KafkaException:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> is not an instance of org.apache.kafka.common.serialization.Deserializer
>  at
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
>  at
> org.apache.kafka.clients.consumer.KafkaConsumer.  ... 15 more
>
>
>
> --
> Best,
> zz zhang


Re: Re: flink1.10 on yarn 问题

2020-05-29 Thread tison
你运行的命令是啥?然后在哪个目录下运行的,和 flink 下载下来解压的目录是什么相对关系?

Best,
tison.


air23  于2020年5月29日周五 下午2:35写道:

> 代码就是flink自带的例子。
>
> public class WordCountStreamingByJava {
> public static void main(String[] args) throws Exception {
>
> // 创建执行环境
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 设置socket数据源
> DataStreamSource source = env.socketTextStream("zongteng75", 9001,
> "\n");
>
> // 转化处理数据
> DataStream dataStream = source.flatMap(new
> FlatMapFunction() {
> @Override
> public void flatMap(String line, Collector collector)
> throws Exception {
>
> System.out.println(line);
> for (String word : line.split(" ")) {
> collector.collect(new WordWithCount(word, 1));
> }
> }
> }).keyBy("word")//以key分组统计
> .timeWindow(Time.seconds(2),Time.seconds(2))//设置一个窗口函数,模拟数据流动
> .sum("count");//计算时间窗口内的词语个数
>
> // 输出数据到目的端
> dataStream.print();
>
> // 执行任务操作
> env.execute("Flink Streaming Word Count By Java");
>
> }
>
>
>
>
> 我现在加了flink环境变量 这个例子 可以过了。就很奇怪
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-05-29 14:22:39,"tison"  写道:
> >然后你 execute 前后的代码片段甚至整个 main 如果可以的话通过 gist 贴一下(x)
> >
> >Best,
> >tison.
> >
> >
> >tison  于2020年5月29日周五 下午2:21写道:
> >
> >> 这个问题好诡异啊,一般来说编译会在 env.execute
> >> 的时候拦截,不应该真的调度起来才对。你能详细描述一下你提交作业的方法还有这个错误报在哪里吗(client?cluster?)?
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> air23  于2020年5月29日周五 下午1:38写道:
> >>
> >>> cdh运行flink1.10 on cdh yarn 报错如下。 用1.7.2版本就没有问题
> >>> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar 也加了
> >>> hadoop环境变量 export HADOOP_CONF_DIR=/etc/hadoop/conf
> >>> 求解答
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> org.apache.flink.client.program.ProgramInvocationException: The main
> >>> method caused an error:
> >>> org.apache.flink.client.program.ProgramInvocationException: Job failed
> >>> (JobID: e358699c1be6be1472078771e1fd027f)
> >>>
> >>> at
> >>>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >>>
> >>> at
> >>>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >>>
> >>> at
> >>>
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >>>
> >>> at
> >>>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
> >>>
> >>> at
> >>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> >>>
> >>> at
> >>>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
> >>>
> >>> at
> >>>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> >>>
> >>> at java.security.AccessController.doPrivileged(Native Method)
> >>>
> >>> at javax.security.auth.Subject.doAs(Subject.java:422)
> >>>
> >>> at
> >>>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >>>
> >>> at
> >>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> >>>
> >>> Caused by: java.util.concurrent.ExecutionException:
> >>> org.apache.flink.client.program.ProgramInvocationException: Job failed
> >>> (JobID: e358699c1be6be1472078771e1fd027f)
> >>>
> >>> at
> >>>
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> >>>
> >>> at
> >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> >>>
> >>> at
> >>>
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
> >>>
> >>> at
> >>>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.e

Re: flink1.10 on yarn 问题

2020-05-29 Thread tison
然后你 execute 前后的代码片段甚至整个 main 如果可以的话通过 gist 贴一下(x)

Best,
tison.


tison  于2020年5月29日周五 下午2:21写道:

> 这个问题好诡异啊,一般来说编译会在 env.execute
> 的时候拦截,不应该真的调度起来才对。你能详细描述一下你提交作业的方法还有这个错误报在哪里吗(client?cluster?)?
>
> Best,
> tison.
>
>
> air23  于2020年5月29日周五 下午1:38写道:
>
>> cdh运行flink1.10 on cdh yarn 报错如下。 用1.7.2版本就没有问题
>> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar 也加了
>> hadoop环境变量 export HADOOP_CONF_DIR=/etc/hadoop/conf
>> 求解答
>>
>>
>>
>>
>>
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error:
>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>> (JobID: e358699c1be6be1472078771e1fd027f)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>>
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>>
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>> (JobID: e358699c1be6be1472078771e1fd027f)
>>
>> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>
>> at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>
>> at
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>>
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>
>> at
>> tt.WordCountStreamingByJava.main(WordCountStreamingByJava.java:36)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>
>> ... 11 more
>>
>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>> Job failed (JobID: e358699c1be6be1472078771e1fd027f)
>>
>> at
>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>>
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
>>
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>
>>

Re: 向flink push代码

2020-05-27 Thread tison
Flink 的特点就是快(x)

Best,
tison.


宇张  于2020年5月28日周四 上午10:56写道:

> 感谢大佬们,我看到  Leonard Xu大佬已经关注了FLINK-17991
> <https://issues.apache.org/jira/browse/FLINK-17991>这个,好快的响应速度
>
> On Thu, May 28, 2020 at 10:25 AM Leonard Xu  wrote:
>
> > Hi,
> > Yangze 贴了官方教程,也可以看下 Jark 的博客[1],中文的看起来会快一些。
> >
> > Best,
> > Leonard Xu
> > [1] https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/ <
> > https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/>
> >
> > > 在 2020年5月28日,10:18,Yangze Guo  写道:
> > >
> > > 您好,社区的贡献代码教程[1]。
> > >
> > > Tips: 如果不是hotfix,请提交前确认已经在ticket下达成一致并且有committer将其assign给您。
> > >
> > > [1] https://flink.apache.org/zh/contributing/contribute-code.html
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Thu, May 28, 2020 at 10:15 AM 宇张  wrote:
> > >>
> > >> 找打了教程了
> > >>
> > >>
> > >> On Thu, May 28, 2020 at 10:10 AM 宇张  wrote:
> > >>
> > >>> hi,
> > >>> 请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。
> > >>>
> >
> >
>


Re: flink 1.10webui不显示print内容

2020-05-26 Thread tison
你这个程序看起来不能通过 Web UI 提交。Flink 依赖内部异常在 Web UI 提交的路径里做编译。你这直接 Catch 了是拿不到作业图的。

你这个作业真的起来了吗?

具体提交的操作怎么样的,除了你要的 taskmanager.out 没有,有啥?

Best,
tison.


smq <374060...@qq.com> 于2020年5月27日周三 上午7:34写道:

> FlinkKafkaConsumer011 FlinkKafkaConsumer011<(topic, new SimpleStringSchema(), properties);
> consumer.setStartFromLatest();
> DataStreamSource SingleOutputStreamOperator stream.map(
> new MapFunction int num = 0;
>
> @Override
> public Tuple2 num++;
> if (num % 10 == 0) {
>
> System.out.println("出现错误,即将重启");
> throw new RuntimeException("出现错误,程序重启!");
> } else {
> return new Tuple2(s, 1);
> }
> }
> }).keyBy(0)
> .sum(1);
>
> sum.print();
> try {
> env.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> 这个是部分代码
>
>
> --原始邮件--
> 发件人:"Benchao Li" 发送时间:2020年5月26日(星期二) 晚上11:29
> 收件人:"user-zh"
> 主题:Re: flink 1.10webui不显示print内容
>
>
>
> 你可以给我们看一下你是怎么print的么?
>
> smq <374060...@qq.com 于2020年5月26日周二 下午11:20写道:
>
>  我这个在集群上提交或者webui提交都看不到输出内容,这应该不是client吧
> 
> 
>  ---原始邮件---
>  发件人: quot;Lijie Wangquot;  发送时间: 2020年5月26日(周二) 晚上10:14
>  收件人: quot;user-zh@flink.apache.orgquot;<
> user-zh@flink.apache.orggt;;
>  主题: 回复:flink 1.10webui不显示print内容
> 
> 
>  这个是不需要配置并且所有版本都支持的,你可以看一下 taskmanager.out 的输出内容。 此外,你需要确认一下你 print
>  的逻辑是否属于在 TM 端执行,有可能是在 client 端被执行的。
> 
> 
> 
> 
>  在2020年05月26日 21:39,smq<374060...@qq.comgt; 写道:
>  Hi
>  我的代码中打印的结果不能在webui上stdout看到,但是网上看的博客有人是可以显示打印内容的,只不过不是1.10版本。
>  请问是配置的问题还是这个版本不支持呢
>
>
>
> --
>
> Best,
> Benchao Li


Re: 全局state

2020-05-26 Thread tison
任意并行度全局状态从物理上就是不可行的,你可以了解一下分布式计算系统怎么部署物理作业的。“全局状态”要么依赖外部存储要么依赖实现(部署)细节。

你这个需求能不能自定义 KeyBy 细节(KeySelector)来实现?相关文档见
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/api_concepts.html#define-keys-using-key-selector-functions

Best,
tison.


star <3149768...@qq.com> 于2020年5月26日周二 下午6:42写道:

> 请问,有全局状态组件吗?我有一个需求需要对数据里的id和name做映射,也就是如果两条数据的id或者name相同则映射成一个值;现在只能使用operator
> state,并且并行度设置为1,来实现全局state
>
>
> 谢谢
>
> 发自我的iPhone


Re: 关于水位线Watermark的理解

2020-05-26 Thread tison
最近刚好看到张俊老师的 Flink 分享 [1],这个里面对你想了解的部分介绍得很详细,可以结合阅读(x)

Best,
tison.

[1] https://files.alicdn.com/tpsservice/73a1f1c404d2a658585cf4f4d86ef776.pdf


smq <374060...@qq.com> 于2020年5月24日周日 下午10:25写道:

> 恩恩,我是刚接触flink不久,所以很多地方没有很清楚,谢谢指点
>
>
> ---原始邮件---
> 发件人: tison 发送时间: 2020年5月24日(周日) 晚上10:10
> 收件人: user-zh 主题: Re: 关于水位线Watermark的理解
>
>
> 整体没啥问题,但是我看你说【假如第一个数据的事件时间刚好为12:00的,那么此时水位线应该在11:59】,这个 Watermark 跟
> allowedLateness 没啥关系哈,是独立的逻辑。
>
> 文档层面你可以看看[1],源码你可以看看[2]里面检索 allowedLateness
>
> Best,
> tison.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#allowed-lateness
> [2]
>
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
>
>
>
> Benchao Li 
>  Hi,
>  你理解的是正确的,进入哪个窗口完全看事件时间,窗口什么时候trigger,是看watermark。
> 
>  smq <374060...@qq.com 于2020年5月24日周日 下午9:46写道:
> 
>  
>  
> 
> 使用时间时间窗口处理关于数据延迟,加入允许延迟时间为1min,窗口大小是10min,那么在12:00-12:10这个窗口中,如果事件时间是在12:09:50这个数据在12:10:50这个数据到达,并且此时水位线刚好在12:09:50,那么这个延迟数据可以被处理,这个可以理解。
>  
>  
> 
> 但是,假如第一个数据的事件时间刚好为12:00的,那么此时水位线应该在11:59,这个数据能进入12:00-12:10这个窗口被处理吗。按道理来说应该被正确处理。那么这样的话,进入窗口是按照事件时间,触发是按照水印时间。不知道这么理解对不对,这个问题想了很久。
> 
> 
> 
>  --
> 
>  Best,
>  Benchao Li
> 


Re: java.lang.NoSuchMethodError while writing to Kafka from Flink

2020-05-24 Thread tison
Could you try to download binary dist from flink download page and
re-execute the job? It seems like something wrong with flink-dist.jar.

BTW, please post user question on only user mailing list(not dev).

Best,
tison.


Guowei Ma  于2020年5月25日周一 上午10:49写道:

> Hi
> 1. You could check whether the 'org.apache.flink.api.java.clean' is in
> your classpath first.
> 2. Do you follow the doc[1] to deploy your local cluster and run some
> existed examples such as WordCount?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/cluster_setup.html
> Best,
> Guowei
>


Re: RichMapFunction的问题

2020-05-24 Thread tison
关于第一个问题,最好细化一下【各种问题】是什么问题。

关于第二个问题,我印象中目前 Flink 不支持按并发(SubTask)级别指定调度的位置,绕过方案可以是设置每个 TM 仅持有一个
Slot。这方面我抄送 Xintong,或许他的工作能帮到你。

Best,
tison.


xue...@outlook.com  于2020年5月25日周一 上午11:29写道:

> 遇到两个问题:
>   背景:flink v1.10集群,几十台主机,每台CPU 16,内存 50G,整个job的并发是200
>   比如我的一个RichMapFunction在open中会加载存量数据。
>   因维度数据和主数据是非常离散的,因此这些维度数据都需要加载到内存
>
> 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够;
>
>
> 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上;
>
> 说简单点:
>
> 1、 对于RichMapFunction的open中需要加载大量维度数据,并发度上不去受什么影响;
>
> 2、 对于一个算子如何干预使其分散到不同的taskmanager上;
>
>
>
>
> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>


Re: 使用广播流要怎么保证广播流比数据流先到?

2020-05-24 Thread tison
高老师的方案应该是比较 make sense 的,你从网络上去限制某个先到后到很麻烦,而且就算可以,也会涉及 Flink
网络层很底层的逻辑。通常来说希望【先到】的含义是【先处理】,那你把物理上先到的缓存起来后处理就可以了。

Best,
tison.


1048262223 <1048262...@qq.com> 于2020年5月24日周日 下午2:08写道:

> Hello,我的理解是这样的
> 广播流一般都是为了减少访问外部配置数据,提高性能来使用的,因此如果你是在这种场景下使用播流,我有一个在生产实践过的方法可供参考。
>
> 可以先在正常数据处理流的open方法中初始化访问一次配置,后续配置变更时再去使用广播中的数据对配置进行更新。如果硬要求某些数据必须在某个广播流配置数据更新后才能进行处理,则可以使用大佬们在上面提供的用state存储的方式进行解决。
>
>
> -- 原始邮件 --
> 发件人: Yun Gao  发送时间: 2020年5月24日 13:56
> 收件人: 462329521 <462329...@qq.com, user-zh  
> 主题: 回复:使用广播流要怎么保证广播流比数据流先到?
>
>
>
> Hello,据我了解,现在应该法有办法做到让一个流先到。
>
> 一种workaround的方法应该是在广播流全部到达之前,通过state先缓存收到的数据;然后等到广播流到达后再进行处理。
>
>
> --
> Sender:462329521<462329...@qq.com
> Date:2020/05/24 11:32:17
> Recipient:user-zh Theme:使用广播流要怎么保证广播流比数据流先到?
>
> 你好,我想问一下我们在业务系统中需要广播流比数据流先到,要怎么保证这种先后顺序?


Re: 关于水位线Watermark的理解

2020-05-24 Thread tison
整体没啥问题,但是我看你说【假如第一个数据的事件时间刚好为12:00的,那么此时水位线应该在11:59】,这个 Watermark 跟
allowedLateness 没啥关系哈,是独立的逻辑。

文档层面你可以看看[1],源码你可以看看[2]里面检索 allowedLateness

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#allowed-lateness
[2]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java



Benchao Li  于2020年5月24日周日 下午9:56写道:

> Hi,
> 你理解的是正确的,进入哪个窗口完全看事件时间,窗口什么时候trigger,是看watermark。
>
> smq <374060...@qq.com> 于2020年5月24日周日 下午9:46写道:
>
> >
> >
> 使用时间时间窗口处理关于数据延迟,加入允许延迟时间为1min,窗口大小是10min,那么在12:00-12:10这个窗口中,如果事件时间是在12:09:50这个数据在12:10:50这个数据到达,并且此时水位线刚好在12:09:50,那么这个延迟数据可以被处理,这个可以理解。
> >
> >
> 但是,假如第一个数据的事件时间刚好为12:00的,那么此时水位线应该在11:59,这个数据能进入12:00-12:10这个窗口被处理吗。按道理来说应该被正确处理。那么这样的话,进入窗口是按照事件时间,触发是按照水印时间。不知道这么理解对不对,这个问题想了很久。
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: 關於LocalExecutionEnvironment使用MiniCluster的配置

2020-05-24 Thread tison
是这样的。

这里的配置可以参考[1][2]两个类,具体你 Maven 启动的代码路径还跟[3][4]有关。

这边可能确实文档比较缺失。可以看下配置传递的路径,TM 的数量还有 RPC 的共享格式等配置,至少编程接口上都是可以配的。

Best,
tison.

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
[2]
https://github.com/apache/flink/blob/ab947386ed93b16019f36c50e9a3475dd6ad3c4a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
[3]
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
[4]
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java




月月  于2020年5月24日周日 下午9:11写道:

> 您好,
> 在單機模式使用maven執行專案時,會自動啟動MiniCluster,
> 我想請問在這種情形下,預設是配置一個JobManager以及一個TaskManager嗎?
>
> 找了一下文件中並沒有相關的說明。
>
> 感謝!
>


Re: save point容灾方案咨询

2020-05-17 Thread tison
这个我理解不在 Flink 的范畴里啊。你 savepoint 存到一个位置,然后外部挂一个同步器在主集群和容灾集群里同步(savepoint
目录)就可以吧。

Best,
tison.


zhisheng  于2020年5月17日周日 下午8:40写道:

> hi
>
> 如果做 Checkpoint 或者 Savepoint 的时候可以填两个 HDFS 集群的地址路径(一个是你的主集群/另一个是容灾集群)
> 是不是就可以解决你现在的问题,达到你想要的需求?
>
> Best
>
> zhisheng
>
> 请叫我雷锋 <854194...@qq.com> 于2020年5月17日周日 下午7:32写道:
>
> > 谢谢关注:
> >
> >
> > savepoint 容灾 是指的,每次执行savepoint生成的文件,能够在容灾集群上做备份。当主集群变得不可用时,可以将任务迁移到容灾
> > 集群进行根据savepoint 进行任务恢复。
> >
> >
> > --原始邮件--
> > 发件人:"Congxian Qiu" > 发送时间:2020年5月17日(星期天) 晚上6:01
> > 收件人:"user-zh" >
> > 主题:Re: save point容灾方案咨询
> >
> >
> >
> > 你好
> >
> > 请问这里的 savepoint 容灾的 “容灾” 具体是指什么呢?希望解决什么问题呢?
> >
> > Best,
> > Congxian
> >
> >
> > LakeShen  >
> >  Hi ,
> > 
> >  你可以把你的场景在描述的详细一些。
> > 
> >  Best,
> >  LakeShen
> > 
> >  请叫我雷锋 <854194...@qq.com 于2020年5月14日周四 下午9:42写道:
> > 
> >   各位大佬好,请问有啥好的save point容灾方案嘛?
> >  
> >  
> >  
> >   发自我的iPhone
> > 
>


Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

2020-05-17 Thread tison
考虑把 SQL 贴成 gist 链接?

Best,
tison.


claylin <1012539...@qq.com> 于2020年5月17日周日 下午5:32写道:

> sql作业定义如下,也通过TableConfig设置了最大和最小idle
> time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE
> TABLE yy_yapmnetwork_original ( happenAt BIGINT, uid BIGINT,
>  appId STRING, deviceId STRING, appVer STRING, dnsDur BIGINT,
>useGlb INT, hitCache INT, requestSize DOUBLE, responseSize
> DOUBLE, totalDur BIGINT, url STRING, statusCode INT,
>  prototype STRING, netType STRING, traceId STRING, ts AS
> CAST(FROM_UNIXTIME(happenAt/1000) AS TIMESTAMP(3)), WATERMARK FOR ts AS
> ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka',
> 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics',
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
> 'connector.properties.bootstrap.servers' = 'kafkawx007-core001.yy.com:8101
> ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', '
> connector.properties.group.id' = 'interface_success_rate_consumer',
> 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' );
> create table request_latency_tbl ( app_id string, app_ver string,
>net_type string, prototype string, url string, status_code
> int, w_start string, success_cnt BIGINT, failure_cnt BIGINT,
>  total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 'connector.url' =
> 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=truecharacterEncoding=utf-8zeroDateTimeBehavior=convertToNullautoReconnect=true',
> 'connector.table' = 'request_latency_statistics', 'connector.username' =
> 'yapm_metrics', 'connector.password' = '1234456',
> 'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval'
> = '5s', 'connector.write.max-retries' = '2' ); create view
> request_1minutes_latency  as select appId, appVer, netType, prototype,
> url, statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm') w_start,
>  count(distinct traceId) filter (where statusCode in (200)) as successCnt,
>count(distinct traceId) filter (where statusCode not in (200)) as
> failureCnt, count(distinct traceId) as total_cnt from
> yy_yapmnetwork_original group by appId, appVer, netType, prototype, url,
> statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm'); insert into
> request_latency_tbl select * from  request_1minutes_latency;


Re: flink barrier对齐 理解

2020-05-17 Thread tison
Hi,

你可以看一下官网这张经典的图[1][2],snapshot 是按算子级别来看的,跟 source 不 source 没啥关系,全局的 chk 由 jm
上的 checkpoint coordinator 协调。

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/fig/stream_aligning.svg
[2]
https://ci.apache.org/projects/flink/flink-docs-master/concepts/stateful-stream-processing.html




了不起的盖茨比 <573693...@qq.com> 于2020年5月17日周日 下午2:50写道:

>
> 我的理解是一条数据,会经过n个算子,只有这个数据到达最后一个算子计算完毕,才能checkpoint,否则会导致前几个算子state改变,但是这个数据的offset没有被提交,导致了重复消费数据。
>
>
>
>
>
> -- 原始邮件 --
> 发件人: Benchao Li  发送时间: 2020年5月17日 13:28
> 收件人: user-zh  主题: 回复:flink barrier对齐 理解
>
>
>
> 我感觉应该是这样的:
>
> 比如有两个算子
> A ---hash--- B
>
> A和B分别有2和3个并发。那就是说对于B的某个subtask来讲,需要对齐上游A的2个subtask发过来的barrier,才能做checkpoint。
>
>
> 了不起的盖茨比 <573693...@qq.com 于2020年5月17日周日 下午1:16写道:
>
>  可以理解成,有多个subtask时候,需要等待不同subtask消费数据完毕,之后做checkpoint
> 
> 
> 
> 
> 
>  -- 原始邮件 --
>  发件人: Benchao Li   发送时间: 2020年5月17日 11:34
>  收件人: user-zh   主题: 回复:flink barrier对齐 理解
> 
> 
> 
>  Hi,
> 
>  我对这块不是非常了解,但是我理解的barrier对齐,指的是同一个Task的多个subtask之间对齐吧。
>  比如你只有一个source,然后经过keyby之后做了其他的操作,那也是存在barrier对齐的。
> 
>  了不起的盖茨比 <573693...@qq.comgt; 于2020年5月17日周日 上午11:29写道:
> 
>  gt; 请教一下,如果只有一个source,就不需要对齐了吧?只有source多个数据源时候才需要对齐?
> 
> 
> 
>  --
> 
>  Benchao Li
>  School of Electronics Engineering and Computer Science, Peking
> University
>  Tel:+86-15650713730
>  Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: flink build-in 的 udf 的源码

2020-05-16 Thread tison
Hi Benchao,

我想搭车问一下这个代码生成是全局仅一次还是每个 call 都会走一遍流程?或者是其他策略。

Best,
tison.


Benchao Li  于2020年5月16日周六 下午9:50写道:

> Hi,
>
> Flink内置函数的实现方式跟udf不太一样,很多函数是直接用的代码生成来做的。
>
> 下面是以blink planner为例,大概说下流程:
> 1. FlinkSqlOperatorTable 这个类里面放的是内置函数表,这个表会被calcite parse
> SQL的时候用到,直接把这些函数识别为具体的某个函数定义。
> 2.
>
> 然后再代码生成阶段,会识别到这些函数,根据不同的函数定义,生成不同的函数实现调用。这部分你可以直接看下`org.apache.flink.table.planner.codegen.calls`这个package下的代码。
> 3. 上面第2条说的主要是scalar function的生成方式,agg
>
> function还要特殊一点,这部分可以参考下`org.apache.flink.table.planner.functions.aggfunctions`这个package下的代码。
>
>
> venn  于2020年5月16日周六 下午3:53写道:
>
> > 各位大佬,请问下,flink 内置的 udf 的源码在什么位置,还有在哪里完成的函数注
> > 册? 非常感谢各位大佬回复
> >
> >
> >
> > Thanks a lot !
> >
> >
> >
> >
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-23 Thread tison
另外你 shaded 里面去 shaded com.ibm.icu 也意义不明...

Best,
tison.


tison  于2020年4月23日周四 下午3:34写道:

> 这个问题我建议你记一个 JIRA 然后提供一个可复现的程序。因为你如果是 Flink Standalone Session 模式,在 Client
> 端编译失败抛出如上异常,不应该跟放不放在 lib 下有什么关系。这边听你说感觉也很奇怪,可能需要本地复现一下比较好判断。
>
> Best,
> tison.
>
>
> 宇张  于2020年4月23日周四 上午11:53写道:
>
>> 你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
>> KafkaTableSourceSinkFactory
>> 吗?(同时 class loading 为 child-first)
>> 》》是的
>>
>> On Thu, Apr 23, 2020 at 11:42 AM tison  wrote:
>>
>> > >》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
>> > >这个能拿到
>> >
>> > 你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
>> > KafkaTableSourceSinkFactory
>> > 吗?(同时 class loading 为 child-first)
>> >
>> > 如果是这样,听起来 client 的 classloading 策略没啥问题,似乎是 SPI 加载那边的 ClassLoader 有问题。之前
>> > FileSystem 相关解析就出过类似的 ClassLoader 的 BUG
>> >
>> > Best,
>> > tison.
>> >
>> >
>> > 宇张  于2020年4月23日周四 上午11:36写道:
>> >
>> > > 我尝试进行了添加,程序依然无法运行,异常信息和上面一致,下面是我的shade配置:
>> > >
>> > > 
>> > > org.apache.maven.plugins
>> > > maven-shade-plugin
>> > > 
>> > > 
>> > > 
>> > > package
>> > > 
>> > > shade
>> > > 
>> > > 
>> > > 
>> > > > > >
>> > >
>> > >
>> >
>> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>> > >
>> > > com.akulaku.data.main.StreamMain
>> > > 
>> > > 
>> > >
>> > > 
>> > > 
>> > > *:*
>> > > 
>> > > META-INF/*.SF
>> > > META-INF/*.DSA
>> > > META-INF/*.RSA
>> > > 
>> > > 
>> > > 
>> > >
>> > > 
>> > > 
>> > > 
>> > >
>> > > org.apache.flink:flink-table-common
>> > >
>> > > org.apache.flink:flink-table-api-java
>> > >
>> > > org.apache.flink:flink-table-api-java-bridge_2.11
>> > >
>> > > org.apache.flink:flink-table-planner-blink_2.11
>> > >
>> > > org.apache.flink:flink-connector-kafka-0.11_2.11
>> > >
>> > > org.apache.flink:flink-connector-kafka-base_2.11
>> > > org.apache.flink:flink-json
>> > > 
>> > > 
>> > > 
>> > > 
>> > > 
>> > > com.ibm.icu
>> > >
>> > >
>> org.apache.flink.table.shaded.com.ibm.icu
>> > > 
>> > > 
>> > > 
>> > > 
>> > > 
>> > > 
>> > >
>> > >
>> > > On Thu, Apr 23, 2020 at 10:53 AM Jingsong Li 
>> > > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > Flink的connector发现机制是通过java
>> > spi服务发现机制的,所以你的services下文件不包含Kafka相关的内容就不会加载到。
>> > > >
>> > > > > 而且两种打包方式运行时是都能加载到KafkaFactory类文件的
>> > > >
>> > > > 只是类文件是没有用的,没地方引用到它。
>> > > >
>> > > > 你试试[1]中的方法?添加combine.children
>> > > >
>> > > > [1]
>> > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-uber-blink/pom.xml#L104
>> > > >
>> > > > Best,
>> > > > Jingsong Lee
>> > > >
>> > > > On Thu, Apr 23, 2020 at 10:37 AM 宇张  wrote:
>> > > >
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> 我这面采用shade打包方式进行了尝试,发现依然运行出错,运行错误日志与assembly打包产生的错误日志一致,就是上面提到的错误,而且shade和assembly打包产生的
>> > > > >
>> >

Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-23 Thread tison
这个问题我建议你记一个 JIRA 然后提供一个可复现的程序。因为你如果是 Flink Standalone Session 模式,在 Client
端编译失败抛出如上异常,不应该跟放不放在 lib 下有什么关系。这边听你说感觉也很奇怪,可能需要本地复现一下比较好判断。

Best,
tison.


宇张  于2020年4月23日周四 上午11:53写道:

> 你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
> KafkaTableSourceSinkFactory
> 吗?(同时 class loading 为 child-first)
> 》》是的
>
> On Thu, Apr 23, 2020 at 11:42 AM tison  wrote:
>
> > >》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
> > >这个能拿到
> >
> > 你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
> > KafkaTableSourceSinkFactory
> > 吗?(同时 class loading 为 child-first)
> >
> > 如果是这样,听起来 client 的 classloading 策略没啥问题,似乎是 SPI 加载那边的 ClassLoader 有问题。之前
> > FileSystem 相关解析就出过类似的 ClassLoader 的 BUG
> >
> > Best,
> > tison.
> >
> >
> > 宇张  于2020年4月23日周四 上午11:36写道:
> >
> > > 我尝试进行了添加,程序依然无法运行,异常信息和上面一致,下面是我的shade配置:
> > >
> > > 
> > > org.apache.maven.plugins
> > > maven-shade-plugin
> > > 
> > > 
> > > 
> > > package
> > > 
> > > shade
> > > 
> > > 
> > > 
> > >  > >
> > >
> > >
> >
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> > >
> > > com.akulaku.data.main.StreamMain
> > > 
> > > 
> > >
> > > 
> > > 
> > > *:*
> > > 
> > > META-INF/*.SF
> > > META-INF/*.DSA
> > > META-INF/*.RSA
> > > 
> > > 
> > > 
> > >
> > > 
> > > 
> > > 
> > >
> > > org.apache.flink:flink-table-common
> > >
> > > org.apache.flink:flink-table-api-java
> > >
> > > org.apache.flink:flink-table-api-java-bridge_2.11
> > >
> > > org.apache.flink:flink-table-planner-blink_2.11
> > >
> > > org.apache.flink:flink-connector-kafka-0.11_2.11
> > >
> > > org.apache.flink:flink-connector-kafka-base_2.11
> > > org.apache.flink:flink-json
> > > 
> > > 
> > > 
> > > 
> > > 
> > > com.ibm.icu
> > >
> > >
> org.apache.flink.table.shaded.com.ibm.icu
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > >
> > >
> > > On Thu, Apr 23, 2020 at 10:53 AM Jingsong Li 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Flink的connector发现机制是通过java
> > spi服务发现机制的,所以你的services下文件不包含Kafka相关的内容就不会加载到。
> > > >
> > > > > 而且两种打包方式运行时是都能加载到KafkaFactory类文件的
> > > >
> > > > 只是类文件是没有用的,没地方引用到它。
> > > >
> > > > 你试试[1]中的方法?添加combine.children
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-uber-blink/pom.xml#L104
> > > >
> > > > Best,
> > > > Jingsong Lee
> > > >
> > > > On Thu, Apr 23, 2020 at 10:37 AM 宇张  wrote:
> > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 我这面采用shade打包方式进行了尝试,发现依然运行出错,运行错误日志与assembly打包产生的错误日志一致,就是上面提到的错误,而且shade和assembly打包产生的
> > > > >
> > > > >
> > > >
> > >
> >
> META-INF/services/org.apache.flink.table.factories.TableFactory文件及里面的内容一致,而且两种打包方式运行时是都能加载到KafkaFactory类文件的,所以貌似不是打包导致的问题,而更像是bug
> > > > > 下面是我maven插件配置:
> > > > >
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > >

Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-22 Thread tison
>》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
>这个能拿到

你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
KafkaTableSourceSinkFactory
吗?(同时 class loading 为 child-first)

如果是这样,听起来 client 的 classloading 策略没啥问题,似乎是 SPI 加载那边的 ClassLoader 有问题。之前
FileSystem 相关解析就出过类似的 ClassLoader 的 BUG

Best,
tison.


宇张  于2020年4月23日周四 上午11:36写道:

> 我尝试进行了添加,程序依然无法运行,异常信息和上面一致,下面是我的shade配置:
>
> 
> org.apache.maven.plugins
> maven-shade-plugin
> 
> 
> 
> package
> 
> shade
> 
> 
> 
> 
>
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>
> com.akulaku.data.main.StreamMain
> 
> 
>
> 
> 
> *:*
> 
> META-INF/*.SF
> META-INF/*.DSA
> META-INF/*.RSA
> 
> 
> 
>
> 
> 
> 
>
> org.apache.flink:flink-table-common
>
> org.apache.flink:flink-table-api-java
>
> org.apache.flink:flink-table-api-java-bridge_2.11
>
> org.apache.flink:flink-table-planner-blink_2.11
>
> org.apache.flink:flink-connector-kafka-0.11_2.11
>
> org.apache.flink:flink-connector-kafka-base_2.11
> org.apache.flink:flink-json
> 
> 
> 
> 
> 
> com.ibm.icu
>
> org.apache.flink.table.shaded.com.ibm.icu
> 
> 
> 
> 
> 
> 
>
>
> On Thu, Apr 23, 2020 at 10:53 AM Jingsong Li 
> wrote:
>
> > Hi,
> >
> > Flink的connector发现机制是通过java spi服务发现机制的,所以你的services下文件不包含Kafka相关的内容就不会加载到。
> >
> > > 而且两种打包方式运行时是都能加载到KafkaFactory类文件的
> >
> > 只是类文件是没有用的,没地方引用到它。
> >
> > 你试试[1]中的方法?添加combine.children
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-uber-blink/pom.xml#L104
> >
> > Best,
> > Jingsong Lee
> >
> > On Thu, Apr 23, 2020 at 10:37 AM 宇张  wrote:
> >
> > >
> > >
> >
> 我这面采用shade打包方式进行了尝试,发现依然运行出错,运行错误日志与assembly打包产生的错误日志一致,就是上面提到的错误,而且shade和assembly打包产生的
> > >
> > >
> >
> META-INF/services/org.apache.flink.table.factories.TableFactory文件及里面的内容一致,而且两种打包方式运行时是都能加载到KafkaFactory类文件的,所以貌似不是打包导致的问题,而更像是bug
> > > 下面是我maven插件配置:
> > >
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > org.apache.maven.plugins
> > > maven-shade-plugin
> > > 
> > > 
> > > 
> > > package
> > > 
> > > shade
> > > 
> > > 
> > > 
> > >  > >
> > >
> > >
> >
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> > >
> > > com.akulaku.data.main.StreamMain
> > > 
> > > 
> > >
> > > 
> > > 
> > > *:*
> > > 
> > > META-INF/*.SF
> > > META-INF/*.DSA
> > > META-INF/*.RSA
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > >
> > >
> > > On Wed, Apr 22, 2020 at 8:00 PM Jingsong Li 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > >
> > > >
> > >
> >
> 如果org.apache.flink.table.factories.TableFactory里面没有

Re: Flink 1.10.0 stop command

2020-04-22 Thread tison
To be precise, the cancel command would succeed on cluster side but the
response *might* lost so that client throws with TimeoutException. If it is
the case, this is the root which will be fixed in 1.10.1.

Best,
tison.


tison  于2020年4月23日周四 上午1:20写道:

> 'flink cancel' broken because of
> https://issues.apache.org/jira/browse/FLINK-16626
>
> Best,
> tison.
>
>
> Yun Tang  于2020年4月23日周四 上午1:18写道:
>
>> Hi
>>
>> I think you could still use ./bin/flink cancel  to cancel the job.
>> What is the exception thrown?
>>
>> Best
>> Yun Tang
>> --
>> *From:* seeksst 
>> *Sent:* Wednesday, April 22, 2020 18:17
>> *To:* user 
>> *Subject:* Flink 1.10.0 stop command
>>
>>
>> Hi,
>>
>>
>>When i test 1.10.0, i found i must to set savepoint path otherwise i
>> can’t stop the job. I confuse about this, beacuse as i know, savepoint
>> offen large than checkpoint, so i usually resume job from checkpoint.
>> Another problem is sometimes job throw exception and i can’t trigger a
>> savepoint, so i cancel the job and change logical, resume it from last
>> checkpoint. although sometimes will failed, i think this will be a better
>> way, because i can choose cancel with a savepoint or not, so i can decede
>> how to resume. but in 1.10.0, i must to set it, and seems system will
>> trigger savepoint, i think this will take more risk, and it will delete
>> checkpoint even i set retain on cancellation. so i have no checkpoint left.
>> If i use cancel , it will break with exception.
>>
>> So how to work with 1.10.0 ? any advice will be helpful.
>>
>>   Thanks.
>>
>>
>>
>


Re: Flink 1.10.0 stop command

2020-04-22 Thread tison
'flink cancel' broken because of
https://issues.apache.org/jira/browse/FLINK-16626

Best,
tison.


Yun Tang  于2020年4月23日周四 上午1:18写道:

> Hi
>
> I think you could still use ./bin/flink cancel  to cancel the job.
> What is the exception thrown?
>
> Best
> Yun Tang
> --
> *From:* seeksst 
> *Sent:* Wednesday, April 22, 2020 18:17
> *To:* user 
> *Subject:* Flink 1.10.0 stop command
>
>
> Hi,
>
>
>When i test 1.10.0, i found i must to set savepoint path otherwise i
> can’t stop the job. I confuse about this, beacuse as i know, savepoint
> offen large than checkpoint, so i usually resume job from checkpoint.
> Another problem is sometimes job throw exception and i can’t trigger a
> savepoint, so i cancel the job and change logical, resume it from last
> checkpoint. although sometimes will failed, i think this will be a better
> way, because i can choose cancel with a savepoint or not, so i can decede
> how to resume. but in 1.10.0, i must to set it, and seems system will
> trigger savepoint, i think this will take more risk, and it will delete
> checkpoint even i set retain on cancellation. so i have no checkpoint left.
> If i use cancel , it will break with exception.
>
> So how to work with 1.10.0 ? any advice will be helpful.
>
>   Thanks.
>
>
>


Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-22 Thread tison
虽然你放到 lib 下就能行了听起来是个 BUG,能不能说明一下你的 Flink 版本还有具体的启动命令。

FLINK-13749 可能在早期版本上没有,另外 Standalone 的类加载如果是 PerJob 有更改过。

Best,
tison.


tison  于2020年4月22日周三 下午5:48写道:

> 看下你打包的 UberJar 里有没一个内容包括
>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>
> 的文件
>
> META-INF/services/org.apache.flink.table.factories.TableFactory
>
> Best,
> tison.
>
>
> 宇张  于2020年4月22日周三 下午5:30写道:
>
>> 我这面使用Standalone模式运行Flink任务,但是Uber
>> Jar里面的TableSourceFactory不能被加载,即使设置了classloader.resolve-order:
>> child-first,只有放在lib目录才能加载得到,我看发布文档跟改了类加载策略,但是我不知道为什么Uber
>> Jar里面的Factory不能被加载
>> Flink Client respects Classloading Policy (FLINK-13749
>> <https://issues.apache.org/jira/browse/FLINK-13749>)
>> <
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#flink-client-respects-classloading-policy-flink-13749
>> >
>>
>> The Flink client now also respects the configured classloading policy,
>> i.e., parent-first or child-first classloading. Previously, only cluster
>> components such as the job manager or task manager supported this setting.
>> This does mean that users might get different behaviour in their programs,
>> in which case they should configure the classloading policy explicitly to
>> use parent-first classloading, which was the previous (hard-coded)
>> behaviour.
>>
>> 异常信息:
>>
>>   rg.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: findAndCreateTableSource failed.
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at
>>
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: org.apache.flink.table.api.TableException:
>> findAndCreateTableSource failed.
>> at
>>
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>> at
>>
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>> at
>>
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
>> at
>>
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
>> at
>>
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
>> at
>>
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>> at
>>
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>> at
>>
>> org.apache.flink.table.planner.operatio

Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-22 Thread tison
看下你打包的 UberJar 里有没一个内容包括

org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory

的文件

META-INF/services/org.apache.flink.table.factories.TableFactory

Best,
tison.


宇张  于2020年4月22日周三 下午5:30写道:

> 我这面使用Standalone模式运行Flink任务,但是Uber
> Jar里面的TableSourceFactory不能被加载,即使设置了classloader.resolve-order:
> child-first,只有放在lib目录才能加载得到,我看发布文档跟改了类加载策略,但是我不知道为什么Uber Jar里面的Factory不能被加载
> Flink Client respects Classloading Policy (FLINK-13749
> <https://issues.apache.org/jira/browse/FLINK-13749>)
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#flink-client-respects-classloading-policy-flink-13749
> >
>
> The Flink client now also respects the configured classloading policy,
> i.e., parent-first or child-first classloading. Previously, only cluster
> components such as the job manager or task manager supported this setting.
> This does mean that users might get different behaviour in their programs,
> in which case they should configure the classloading policy explicitly to
> use parent-first classloading, which was the previous (hard-coded)
> behaviour.
>
> 异常信息:
>
>   rg.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: findAndCreateTableSource failed.
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at
>
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: org.apache.flink.table.api.TableException:
> findAndCreateTableSource failed.
> at
>
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
> at
>
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
> at
>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
> at
>
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
> at com.akulaku.data.main.StreamMain.main(StreamMain.java:87)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method

Re: json中date类型解析失败

2020-04-22 Thread tison
应该是有内置的 UDF FROM_UNIXTIME 可以用的

Best,
tison.


Leonard Xu  于2020年4月22日周三 下午1:15写道:

> Hi
>  报错是因为'format.ignore-parse-errors'
> 参数是在社区最新的版本才支持的,FLINK-16725在1.11应该也会修复,如果需要使用的话可以等1.11发布后使用或者自己编译master分支,
> 即使有了这个参数你的问题也无法解决,对你的case每行记录都会解析错误所以会过滤掉所有数据。
> 建议你可以在数据源就转为标准的json格式或者写个udf将long转为timestamp后使用。
>
> 祝好,
> Leonard Xu
>
> > 在 2020年4月22日,12:33,王双利  写道:
> >
> > 要不你们再做一个fastjson版本的?
> > 目前内部解析用的都是fastjson
> >
> >
> >
> > 发件人: 王双利
> > 发送时间: 2020-04-22 12:31
> > 收件人: user-zh
> > 主题: 回复: Re: json中date类型解析失败
> >配置后报错误 ,
> > 'format.ignore-parse-errors' = 'true'
> > 这个参数需要怎么配置呢?
> > The matching candidates:
> >org.apache.flink.formats.json.JsonRowFormatFactory
> >Unsupported property keys:
> >format.ignore-parse-errors
> > WITH (
> > ..
> > 'format.type' = 'json',
> > 'format.ignore-parse-errors' = 'true',
> > 
> > )
> >
> >
> >
> > 发件人: Leonard Xu
> > 发送时间: 2020-04-22 12:18
> > 收件人: user-zh; 王双利
> > 主题: Re: json中date类型解析失败
> > Hi,
> > flink支持的json format是遵循RFC标准[1]的,不支持从long型转化为json timestamp, json的
> tiemstamp类型转化可以简单参考下,这个虽然符合标准,单对用户习惯来说确实不友好,目前社区也有一个jira[2]在跟进这个问题了。关于鲁棒性的问题,json
> format有个参数支持跳过解析错误的记录,'format.ignore-parse-errors' = 'true'
> >
> >
> > ```
> > Long time = System.currentTimeMillis();
> > DateFormat dateFormat =  new
> SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'");
> > Date date = new Date(time);
> > String jsonSchemaDate = dateFormat.format(date);
> > ```
> > [1]
> https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times
> > [2] https://issues.apache.org/jira/browse/FLINK-16725 <
> https://issues.apache.org/jira/browse/FLINK-16725>
> >
> > Best,
> > Leonard Xu
> >
> >> 在 2020年4月22日,12:05,王双利  写道:
> >>
> >> 使用  flink-json -1.10.0 解析json数据报下面的错误
> >>
> >> Caused by: java.time.format.DateTimeParseException: Text
> '1587527019680' could not be parsed at index 0
> >>
> >> 经检查 是 以下字段导致的
> >>
> {"jnlno":"e4574cce-8c9f-4d3f-974f-fc15250ec10d","ip":"122.96.41.218","channel":"pc","transdate":1587527019680,"event":"login","userid":"9","deviceid":"11","taskid":"1","retcode":"00","status":"fail"}
> >>
> >> 其中 transdate 是使用fastjson序列化得来的
> >>
> request.put("transdate",cal.getTime());JSON.toJSONString(request),解析失败后,系统直接停止,这个觉得也不太好吧,鲁棒性不够,万一其他系统发送一个错误格式的,系统直接挂掉,感觉不太合理。
> >> 以上的应该怎么解决才合适。
> >>
> >>
> >
>
>


Re: flink1.10关于jar包冲突问题

2020-04-22 Thread tison
能具体看一下报错吗?一般来说 Flink 自己需要的依赖都会 shaded 起来,不需要的传递依赖都应该 exclude 掉。暴露成 API
的类别一般需要封装或者使用稳定的接口。

这可能是一个工程上的问题,你可以具体罗列一下遇到的 JAR 包冲突问题,看一下怎么解。

Best,
tison.


宇张  于2020年4月22日周三 上午11:52写道:

> 在使用Flink1.10时,遇到最多的问题就是jar包冲突问题,okio这个包flink-parent引用的就有四个版本,还有一些没办法<
> exclusions>的包,请问社区有没有优化jar包冲突的提议。
>


Re: 如何看到他人问题

2020-04-21 Thread tison
cc


Leonard Xu  于2020年4月21日周二 下午5:03写道:

> Hi,
> 订阅user-zh邮件邮件组即可收到该邮件组里的所有邮件,
> 可以发送任意内容的邮件到  user-zh-subscr...@flink.apache.org  订阅来自
> user-zh@flink.apache.org 邮件组的邮件
>
> 邮件组的订阅管理,可以参考[1]
>
> 祝好,
> Leonard Xu
> https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list
>
> > 在 2020年4月21日,16:55,一袭月色 <1906286...@qq.com> 写道:
> >
> > 如何看到他人问题
>
>


Re: Re: flink启动任务的方式

2020-04-21 Thread tison
REST API jar run endpoint 不支持关联其他 jar 听起来是个问题。FatJar 是一种解决方案,这个可以提到 JIRA
上作为需求(x

Best,
tison.


Arnold Zai  于2020年4月21日周二 下午5:46写道:

> jarFiles参数不是个参数列表么,多传几个。
>
> 或把依赖提前部署到${FLINK_HOME}/plugins里
>
> chenxuying  于2020年4月21日周二 下午3:36写道:
>
> > 这个是可以 , 不过我们的需求不允许打FatJar
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-04-21 15:27:48,"Arnold Zai"  写道:
> > >打个FatJar
> > >
> > >chenxuying  于2020年4月21日周二 下午2:47写道:
> > >
> > >> 请问下目前flink的启动方式有哪些
> > >> 1 通过命令行来执行
> > >> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c
> > >> cn.xxx.flink.table.sql.Job
> /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
> > >> 2通过自带的webui页面上传jar , submit jar
> > >> 3 通过代码 createRemoteEnvironment
> > >>
> > >> 目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api
> > >> 无法实现命令行那样提供其他的jar包
> > >>
> > >>
> > >>
> > >>
> >
>


Re: Job manager URI rpc address:port

2020-04-19 Thread tison
You can change flink-conf.yaml "jobmanager.address" or "jobmanager.port"
options before run the program or take a look at RemoteStreamEnvironment
which enables configuring host and port.

Best,
tison.


Som Lima  于2020年4月19日周日 下午5:58写道:

> Hi,
>
> After running
>
> $ ./bin/start-cluster.sh
>
> The following line of code defaults jobmanager  to localhost:6123
>
> final  ExecutionEnvironment env = Environment.getExecutionEnvironment();
>
> which is same on spark.
>
> val spark =
> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>
> However if I wish to run the servers on a different physical computer.
> Then in Spark I can do it this way using the spark URI in my IDE.
>
> Conf =
> SparkConf().setMaster("spark://:").setAppName("anapp")
>
> Can you please tell me the equivalent change to make so I can run my
> servers and my IDE from different physical computers.
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: 1.10任务执行过程--源码的一些疑问

2020-04-19 Thread tison
invokable 一般是 StreamTask 或者它的子类 StreamSourceTask,具体的 UDF 在 StreamTask
里,有几层包装。

MailBox 那些其实是一个简单的 EventLoop 实现,或者你理解为 Actor Model 的实现也行,可以参考这些名词的解释文章一一对应。

Best,
tison.


祝尚 <17626017...@163.com> 于2020年4月19日周日 下午5:43写道:

> Hi,all
> 在阅读1.10有关job执行过程相关源码时遇到一些疑问,我在看到Task#doRun()方法
> invokable.invoke();具体执行过程应该在这个方法里吧?
> 进一步看了StreamTask#invoke()->runMailboxLoop();继续往下深入也没发现最终调用udf的入口
> 问题1:MailboxProcessor、Mailbox、Mail这些概念什么意思,什么作用?
>
>
> 然而在另一处实例化AbstractInvokable时,比如StreamTask构造函数里会调用processInput方法,这个就类似1.9之前的实现方式了
> this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox,
> actionExecutor);
> 问题2:这里面是真正的数据处理过程吗?为什么不像1.9之前那样在invokable.invoke()里面做业务处理?
> 感谢您的答复!
>
>
>
> Best,
> Sun.Zhu
>
>
>
>


Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-17 Thread tison
Hi Yang,

Name filtering & schema special handling makes sense for me. We can enrich
later if there is requirement without breaking interface.

For #1, from my perspective your first proposal is

  having an option specifies remote flink/lib, then we turn off auto
uploading local flink/lib and register that path as local resources

It seems we here add another special logic for handling one kind of
things...what I propose is we do these two steps explicitly separated:

1. an option turns off auto uploading local flink/lib
2. a general option register remote files as local resources

The rest thing here is that you propose we handle flink/lib as PUBLIC
visibility while other files as APPLICATION visibility, whether a
composite configuration or name filtering to special handle libs makes
sense though.

YarnClusterDescriptor already has a lot of special handling logics which
introduce a number of config options and keys, which should
have been configured in few of common options and validated at the runtime.

Best,
tison.


Yang Wang  于2020年4月17日周五 下午11:42写道:

> Hi tison,
>
> For #3, if you mean registering remote HDFS file as local resource, we
> should make the "-yt/--yarnship"
> to support remote directory. I think it is the right direction.
>
> For #1, if the users could ship remote directory, then they could also
> specify like this
> "-yt hdfs://hdpdev/flink/release/flink-1.x,
> hdfs://hdpdev/user/someone/mylib". Do you mean we add an
> option for whether trying to avoid unnecessary uploading? Maybe we could
> filter by names and file size.
> I think this is a good suggestion, and we do not need to introduce a new
> config option "-ypl".
>
> For #2, for flink-dist, the #1 could already solve the problem. We do not
> need to support remote schema.
> It will confuse the users when we only support HDFS, not S3, OSS, etc.
>
>
> Best,
> Yang
>
> tison  于2020年4月17日周五 下午8:05写道:
>
>> Hi Yang,
>>
>> I agree that these two of works would benefit from single assignee. My
>> concern is as below
>>
>> 1. Both share libs & remote flink dist/libs are remote ship files. I
>> don't think we have to implement multiple codepath/configuration.
>> 2. So, for concept clarification, there are
>>   (1) an option to disable shipping local libs
>>   (2) flink-dist supports multiple schema at least we said "hdfs://"
>>   (3) an option for registering remote shipfiles with path & visibility.
>> I think new configuration system helps.
>>
>> the reason we have to special handling (2) instead of including it in (3)
>> is because when shipping flink-dist to TM container, we specially
>> detect flink-dist. Of course we can merge it into general ship files and
>> validate shipfiles finally contain flink-dist, which is an alternative.
>>
>> The *most important* difference is (1) and (3) which we don't have an
>> option for only remote libs. Is this clarification satisfy your proposal?
>>
>> Best,
>> tison.
>>
>>
>> Till Rohrmann  于2020年4月17日周五 下午7:49写道:
>>
>>> Hi Yang,
>>>
>>> from what I understand it sounds reasonable to me. Could you sync with
>>> Tison on FLINK-14964 on how to proceed. I'm not super deep into these
>>> issues but they seem to be somewhat related and Tison already did some
>>> implementation work.
>>>
>>> I'd say it be awesome if we could include this kind of improvement into
>>> the release.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Apr 16, 2020 at 4:43 AM Yang Wang  wrote:
>>>
>>>> Hi All, thanks a lot for reviving this discussion.
>>>>
>>>> I think we could unify the FLINK-13938 and FLINK-14964 since they have
>>>> the similar
>>>> purpose, avoid unnecessary uploading and downloading jars in YARN
>>>> deployment.
>>>> The difference is FLINK-13938 aims to support the flink system lib
>>>> directory only, while
>>>> FLINK-14964 is trying to support arbitrary pre-uloaded jars(including
>>>> user and system jars).
>>>>
>>>>
>>>> So i suggest to do this feature as following.
>>>> 1. Upload the flink lib directory or users to hdfs, e.g.
>>>> "hdfs://hdpdev/flink/release/flink-1.x"
>>>> "hdfs://hdpdev/user/someone/mylib"
>>>> 2. Use the -ypl argument to specify the shared lib, multiple
>>>> directories could be specified
>>>> 3. YarnClusterDescriptor will use the pre-uploaded jars to avoid
>>>> unnecessary uploading,
>>>> both for system 

Re: Akka Error

2020-04-17 Thread tison
If you run a program using "flink run" in dist/bin, dependencies should be
taken care of.

Could you describe detailedly how you "start a flink program"? Did you
write an entrypoint, compile it and run by "java YouProgram"? If so, you
should configure classpath by yourself.

Best,
tison.


Alexander Borgschulze  于2020年4月18日周六 上午3:03写道:

> When I try to start a flink program, I get the following exception:
>
> com.typesafe.config.ConfigException$Missing: No configuration setting
> found for key 'akka.version'
> at
> com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152)
> at
> com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170)
> at
> com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
>
>
> Do I need to change some files and add a new key-value pair?
>


Re: multi-sql checkpoint fail

2020-04-17 Thread tison
Hi,

Could you share the stack traces?

Best,
tison.


forideal  于2020年4月18日周六 上午12:33写道:

> Hello friend
> I have two SQL, checkpoint fails all the time. One task is to open a
> sliding window for an hour, and then another task consumes the output data
> of the previous task. There will be no problem with the two tasks submitted
> separately.
>
> -- first Calculation-- second Write the calculation to redis-- firstinsert 
> into
>   dw_access_logselect
>   time_key,
>   query_nor,
>   query_nor_counter,
>   '1' as group_keyfrom(
> select
>   HOP_START(
> event_time_fake,
> interval '1' MINUTE,
> interval '60' MINUTE
>   ) as time_key,
>   query_nor,
>   count(1) as query_nor_counter
> from(
> select
>   RED_JSON_VALUE(request, '$.query_nor') as query_nor,
>   RED_JSON_VALUE(request, '$.target') as target,
>   event_time_fake
> from
>   (
> select
>   red_pb_parser(body, 'request') as request,
>   event_time_fake
> from
>   access_log_source
>   )
>   )
> group by
>   query_nor,
>   HOP(   -- sliding window size one hour, step one minute
> event_time_fake,
> interval '1' MINUTE,
> interval '60' MINUTE
>   )
>   )where
>   query_nor_counter > 100;
> -- secondinsert into
>   dw_sink_access_logselect
>   'fix_key' as `key`,
>   get_json_value(query_nor, query_nor_counter) as `value` -- agg_funcfrom
>   dw_access_loggroup by
>   tumble (time_key_fake, interval '1' MINUTE),
>   group_key
>
> Article Link:https://zhuanlan.zhihu.com/p/132764573
> Picture Link:
> https://pic4.zhimg.com/80/v2-d3b1105b1419fef3ea6b9176085a5597_1440w.jpg
> https://pic3.zhimg.com/80/v2-b6ea7b4a8368c4bae03afb94c723bcca_1440w.jpg
>
> Best, forideal
>
>
>
>
>
>


Re: Can I use Joda-Time in Flink?

2020-04-17 Thread tison
Hi Alexander,

What do you mean exactly? Could you describe it in pseudo code? I'm not
quite sure where Java-Time used in env.

Best,
tison.


Alexander Borgschulze  于2020年4月17日周五 下午9:21写道:

> Can I use Joda-Time instead of Java-Time and set it up in the
> StreamExecutionEnvironment?
>


Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-17 Thread tison
Hi Yang,

I agree that these two of works would benefit from single assignee. My
concern is as below

1. Both share libs & remote flink dist/libs are remote ship files. I don't
think we have to implement multiple codepath/configuration.
2. So, for concept clarification, there are
  (1) an option to disable shipping local libs
  (2) flink-dist supports multiple schema at least we said "hdfs://"
  (3) an option for registering remote shipfiles with path & visibility. I
think new configuration system helps.

the reason we have to special handling (2) instead of including it in (3)
is because when shipping flink-dist to TM container, we specially
detect flink-dist. Of course we can merge it into general ship files and
validate shipfiles finally contain flink-dist, which is an alternative.

The *most important* difference is (1) and (3) which we don't have an
option for only remote libs. Is this clarification satisfy your proposal?

Best,
tison.


Till Rohrmann  于2020年4月17日周五 下午7:49写道:

> Hi Yang,
>
> from what I understand it sounds reasonable to me. Could you sync with
> Tison on FLINK-14964 on how to proceed. I'm not super deep into these
> issues but they seem to be somewhat related and Tison already did some
> implementation work.
>
> I'd say it be awesome if we could include this kind of improvement into
> the release.
>
> Cheers,
> Till
>
> On Thu, Apr 16, 2020 at 4:43 AM Yang Wang  wrote:
>
>> Hi All, thanks a lot for reviving this discussion.
>>
>> I think we could unify the FLINK-13938 and FLINK-14964 since they have
>> the similar
>> purpose, avoid unnecessary uploading and downloading jars in YARN
>> deployment.
>> The difference is FLINK-13938 aims to support the flink system lib
>> directory only, while
>> FLINK-14964 is trying to support arbitrary pre-uloaded jars(including
>> user and system jars).
>>
>>
>> So i suggest to do this feature as following.
>> 1. Upload the flink lib directory or users to hdfs, e.g.
>> "hdfs://hdpdev/flink/release/flink-1.x"
>> "hdfs://hdpdev/user/someone/mylib"
>> 2. Use the -ypl argument to specify the shared lib, multiple directories
>> could be specified
>> 3. YarnClusterDescriptor will use the pre-uploaded jars to avoid
>> unnecessary uploading,
>> both for system and user jars
>> 4. YarnClusterDescriptor needs to set the system jars to public
>> visibility so that the distributed
>> cache in the YARN nodemanager could be reused by multiple applications.
>> This is to avoid
>> unnecessary downloading, especially for the "flink-dist-*.jar". For the
>> user shared lib, the
>> visibility is still set to "APPLICATION" level.
>>
>>
>> For our past internal use case, the shared lib could help with
>> accelerating the submission a lot.
>> Also it helps to reduce the pressure of HDFS when we want to launch many
>> applications together.
>>
>> @tison @Till Rohrmann  @Hailu, Andreas
>>  If you guys thinks the suggestion makes sense. I
>> will try to find some time to work on this and hope it could catch up
>> with release-1.1 cycle.
>>
>>
>> Best,
>> Yang
>>
>> Hailu, Andreas [Engineering]  于2020年4月16日周四
>> 上午8:47写道:
>>
>>> Okay, I’ll continue to watch the JIRAs. Thanks for the update, Till.
>>>
>>>
>>>
>>> *// *ah
>>>
>>>
>>>
>>> *From:* Till Rohrmann 
>>> *Sent:* Wednesday, April 15, 2020 10:51 AM
>>> *To:* Hailu, Andreas [Engineering] 
>>> *Cc:* Yang Wang ; tison ;
>>> user@flink.apache.org
>>> *Subject:* Re: Flink Conf "yarn.flink-dist-jar" Question
>>>
>>>
>>>
>>> Hi Andreas,
>>>
>>>
>>>
>>> it looks as if FLINK-13938 and FLINK-14964 won't make it into the 1.10.1
>>> release because the community is about to start the release process. Since
>>> FLINK-13938 is a new feature it will be shipped with a major release. There
>>> is still a bit of time until the 1.11 feature freeze and if Yang Wang has
>>> time to finish this PR, then we could ship it.
>>>
>>>
>>>
>>> Cheers,
>>>
>>> Till
>>>
>>>
>>>
>>> On Wed, Apr 15, 2020 at 3:23 PM Hailu, Andreas [Engineering] <
>>> andreas.ha...@gs.com> wrote:
>>>
>>> Yang, Tison,
>>>
>>>
>>>
>>> Do we know when some solution for 13938 and 14964 will arrive? Do you
>>> think it will be in a 1.10.x version?
>>>
>>>
>>>
>>

Re: Schema with TypeInformation or DataType

2020-04-17 Thread tison
Thanks for your inputs and sorry that I said Schema doesn't support
DataType to register a field because I was looking into Flink 1.9 codes...

Best,
tison.


Jark Wu  于2020年4月17日周五 下午2:42写道:

> Hi Tison,
>
> Migration from TypeInformation to DataType is a large work and will across
> many releases. As far as I can tell, we will finalize the work in 1.11.
> As godfrey said above, Flink SQL & Table API should always use DataType,
> DataStream uses TypeInformation.
>
> Schema already supports DataType to register a field, and the the method
> using TypeInformation to register field is deprecated since 1.10.
>
> Best,
> Jark
>
> On Fri, 17 Apr 2020 at 14:14, tison  wrote:
>
>> Hi,
>>
>> I notice that our type system has two branches. One  is TypeInformation
>> while the other is
>> DataType. It is said that Table API will use DataType but there are
>> several questions about
>> this statement:
>>
>> 1. Will TypeInformation be deprecated and we use DataType as type system
>> everywhere?
>> 2. Schema in Table API currently support only TypeInformation to register
>> a field, shall we support
>> the DataType way as well?
>>
>> Best,
>> tison.
>>
>


Schema with TypeInformation or DataType

2020-04-17 Thread tison
Hi,

I notice that our type system has two branches. One  is TypeInformation
while the other is
DataType. It is said that Table API will use DataType but there are several
questions about
this statement:

1. Will TypeInformation be deprecated and we use DataType as type system
everywhere?
2. Schema in Table API currently support only TypeInformation to register a
field, shall we support
the DataType way as well?

Best,
tison.


Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 Thread tison
从语义上说,已经有产生 Watermark 的逻辑了,如果 forward 此前的 watermark
在其他一些用户场景下或许也不合适。从另一个角度考虑你也可以把 watermark 带在 element
上,实现 AssignerWithPunctuatedWatermarks 的 Watermark
checkAndGetNextWatermark(T lastElement, long extractedTimestamp); 方法时从
element 取出来

Best,
tison.


tison  于2020年4月16日周四 下午10:36写道:

> 喔,看差了...Flink 原生没有这样的支持,不过可以扩展基础的算子来实现你要的逻辑
>
> 参考 assignTimestampsAndWatermarks
> 的实现,以及 TimestampsAndPunctuatedWatermarksOperator,不要重写 processWatermark
> 方法,应该可以实现。DataStream 方面调用更基础的 transform 方法
>
> 如果你觉得这是一个合理的需求,应该开箱即用,也可以到 Flink 的 JIRA(issue)列表上提
> https://jira.apache.org/jira/projects/FLINK/issues 注册 id 后创建 JIRA 单即可
>
> Best,
> tison.
>
>
> taowang  于2020年4月16日周四 下午10:12写道:
>
>> 感谢回复,但是很抱歉我试了一下发现不可以。
>> 无论是使用了`AssignerWithPeriodicWatermarks`的`getCurrentWatermark`还是`AssignerWithPunctuatedWatermarks`的`checkAndGetNextWatermark`,当它们`return
>> null`时下游算子拿到的水印都显示为`No
>> Watermark`,我在下游算子中打印出`context.currentWatermark()`发现都是`Long.MIN_VALUE`的值。
>> 看了这两个接口文档,不太理解这里的`no new watermark will be
>> generated.`是什么意思,一般理解为不会生成新的水印那之前的水印就应该被继续传下去吧,但实际看来是下游接收不到水印(`no
>> watermark`?)。
>> @tison,你那边是尝试过这种写法吗,如果可以的话,能麻烦给一个简单的demo吗。
>>
>>
>> 感谢帮助!
>> ```
>> public interface AssignerWithPeriodicWatermarks extends
>> TimestampAssigner {
>>
>>  /**
>>  * Returns the current watermark. This method is periodically called by
>> the
>>  * system to retrieve the current watermark. The method may return {@code
>> null} to
>>  * indicate that no new Watermark is available.
>>  *
>>  * The returned watermark will be emitted only if it is non-null and
>> its timestamp
>>  * is larger than that of the previously emitted watermark (to preserve
>> the contract of
>>  * ascending watermarks). If the current watermark is still
>>  * identical to the previous one, no progress in event time has happened
>> since
>>  * the previous call to this method. If a null value is returned, or the
>> timestamp
>>  * of the returned watermark is smaller than that of the last emitted
>> one, then no
>>  * new watermark will be generated.
>>  *
>>  * The interval in which this method is called and Watermarks are
>> generated
>>  * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
>>  *
>>  * @see org.apache.flink.streaming.api.watermark.Watermark
>>  * @see ExecutionConfig#getAutoWatermarkInterval()
>>  *
>>  * @return {@code Null}, if no watermark should be emitted, or the next
>> watermark to emit.
>>  */
>>  @Nullable
>>  Watermark getCurrentWatermark();
>> }
>> ```
>>
>>
>>  原始邮件
>> 发件人: tison
>> 收件人: user-zh
>> 发送时间: 2020年4月16日(周四) 20:33
>> 主题: Re: 为消息分配时间戳但不想重新分配水印
>>
>>
>> 在 getCurrentWatermark 里返回 null 就行了,会 forward 此前的 watermark 的。另外语义上使用
>> AssignerWithPunctuatedWatermarks 会更合适一点。 Best, tison. taowang <
>> taow...@deepglint.com> 于2020年4月16日周四 下午5:13写道: > Hello,大家好: > 在flink >
>> stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。 >
>> 为了实现这个功能,我想有两种方法: > 1. 在算子输出后面重新为消息分配水印:看到flink >
>> stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark
>> > `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。 > 2. >
>> 在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。
>> > > > 我现在只能使用`assignTimestampsAndWatermarks` >
>> 去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗? > 感谢解答!
>
>


Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 Thread tison
喔,看差了...Flink 原生没有这样的支持,不过可以扩展基础的算子来实现你要的逻辑

参考 assignTimestampsAndWatermarks
的实现,以及 TimestampsAndPunctuatedWatermarksOperator,不要重写 processWatermark
方法,应该可以实现。DataStream 方面调用更基础的 transform 方法

如果你觉得这是一个合理的需求,应该开箱即用,也可以到 Flink 的 JIRA(issue)列表上提
https://jira.apache.org/jira/projects/FLINK/issues 注册 id 后创建 JIRA 单即可

Best,
tison.


taowang  于2020年4月16日周四 下午10:12写道:

> 感谢回复,但是很抱歉我试了一下发现不可以。
> 无论是使用了`AssignerWithPeriodicWatermarks`的`getCurrentWatermark`还是`AssignerWithPunctuatedWatermarks`的`checkAndGetNextWatermark`,当它们`return
> null`时下游算子拿到的水印都显示为`No
> Watermark`,我在下游算子中打印出`context.currentWatermark()`发现都是`Long.MIN_VALUE`的值。
> 看了这两个接口文档,不太理解这里的`no new watermark will be
> generated.`是什么意思,一般理解为不会生成新的水印那之前的水印就应该被继续传下去吧,但实际看来是下游接收不到水印(`no
> watermark`?)。
> @tison,你那边是尝试过这种写法吗,如果可以的话,能麻烦给一个简单的demo吗。
>
>
> 感谢帮助!
> ```
> public interface AssignerWithPeriodicWatermarks extends
> TimestampAssigner {
>
>  /**
>  * Returns the current watermark. This method is periodically called by the
>  * system to retrieve the current watermark. The method may return {@code
> null} to
>  * indicate that no new Watermark is available.
>  *
>  * The returned watermark will be emitted only if it is non-null and
> its timestamp
>  * is larger than that of the previously emitted watermark (to preserve
> the contract of
>  * ascending watermarks). If the current watermark is still
>  * identical to the previous one, no progress in event time has happened
> since
>  * the previous call to this method. If a null value is returned, or the
> timestamp
>  * of the returned watermark is smaller than that of the last emitted one,
> then no
>  * new watermark will be generated.
>  *
>  * The interval in which this method is called and Watermarks are
> generated
>  * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
>  *
>  * @see org.apache.flink.streaming.api.watermark.Watermark
>  * @see ExecutionConfig#getAutoWatermarkInterval()
>  *
>  * @return {@code Null}, if no watermark should be emitted, or the next
> watermark to emit.
>  */
>  @Nullable
>  Watermark getCurrentWatermark();
> }
> ```
>
>
>  原始邮件
> 发件人: tison
> 收件人: user-zh
> 发送时间: 2020年4月16日(周四) 20:33
> 主题: Re: 为消息分配时间戳但不想重新分配水印
>
>
> 在 getCurrentWatermark 里返回 null 就行了,会 forward 此前的 watermark 的。另外语义上使用
> AssignerWithPunctuatedWatermarks 会更合适一点。 Best, tison. taowang <
> taow...@deepglint.com> 于2020年4月16日周四 下午5:13写道: > Hello,大家好: > 在flink >
> stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。 >
> 为了实现这个功能,我想有两种方法: > 1. 在算子输出后面重新为消息分配水印:看到flink >
> stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark
> > `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。 > 2. >
> 在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。
> > > > 我现在只能使用`assignTimestampsAndWatermarks` >
> 去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗? > 感谢解答!


Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 Thread tison
在 getCurrentWatermark 里返回 null 就行了,会 forward 此前的 watermark
的。另外语义上使用 AssignerWithPunctuatedWatermarks 会更合适一点。

Best,
tison.


taowang  于2020年4月16日周四 下午5:13写道:

> Hello,大家好:
> 在flink
> stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。
> 为了实现这个功能,我想有两种方法:
> 1. 在算子输出后面重新为消息分配水印:看到flink
> stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark
> `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。
> 2.
> 在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。
>
>
> 我现在只能使用`assignTimestampsAndWatermarks`
> 去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗?
> 感谢解答!


Re: flink 1.7.2 YARN Session模式提交任务问题求助

2020-04-15 Thread tison
注意环境变量和 fs.hdfs.hdfsdefault 要配置成 HDFS 路径或 YARN
集群已知的本地路径,不要配置成客户端的路径。因为实际起作用是在拉起 TM 的那台机器上解析拉取的。

Best,
tison.


Chief  于2020年4月15日周三 下午7:40写道:

> hi Yangze Guo
> 您说的环境变量已经在当前用户的环境变量文件里面设置了,您可以看看我的问题描述,现在如果checkpoint的路径设置不是namenode
> ha的nameservice就不会报错,checkpoint都正常。
>
>
>
>
> --原始邮件--
> 发件人:"Yangze Guo" 发送时间:2020年4月15日(星期三) 下午3:00
> 收件人:"user-zh"
> 主题:Re: flink 1.7.2 YARN Session模式提交任务问题求助
>
>
>
> Flink需要设置hadoop相关conf位置的环境变量 YARN_CONF_DIR or HADOOP_CONF_DIR [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html
>
> Best,
> Yangze Guo
>
> On Mon, Apr 13, 2020 at 10:52 PM Chief  
>  大家好
>  目前环境是flink 1.7.2,使用YARN Session模式提交任务,Hadoop 版本2.7.3,hdfs
> namenode配置了ha模式,提交任务的时候报以下错误,系统环境变量中已经设置了HADOOP_HOME,YARN_CONF_DIR,HADOOP_CONF_DIR,HADOOP_CLASSPATH,在flink_conf.yaml中配置了fs.hdfs.hadoopconf
> 
> 
>  2020-04-10 19:12:02,908 INFOnbsp;
> org.apache.flink.runtime.jobmaster.JobMasternbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; - Connecting to ResourceManager akka.tcp://flink@trusfortpoc1
> :23584/user/resourcemanager()
>  2020-04-10 19:12:02,909 INFOnbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolnbsp; nbsp;
> nbsp; nbsp; nbsp; - Cannot serve slot request, no
> ResourceManager connected. Adding as pending request
> [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}]
>  2020-04-10 19:12:02,911 INFOnbsp;
> org.apache.flink.runtime.jobmaster.JobMasternbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; - Resolved ResourceManager address, beginning registration
>  2020-04-10 19:12:02,911 INFOnbsp;
> org.apache.flink.runtime.jobmaster.JobMasternbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; - Registration at ResourceManager attempt 1 (timeout=100ms)
>  2020-04-10 19:12:02,912 INFOnbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolnbsp; nbsp;
> nbsp; nbsp; nbsp; - Cannot serve slot request, no
> ResourceManager connected. Adding as pending request
> [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}]
>  2020-04-10 19:12:02,913 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Registering job manager
> 0...@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0
> for job 24691b33c18d7ad73b1f52edb3d68ae4.
>  2020-04-10 19:12:02,917 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Registered job manager
> 0...@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0
> for job 24691b33c18d7ad73b1f52edb3d68ae4.
>  2020-04-10 19:12:02,919 INFOnbsp;
> org.apache.flink.runtime.jobmaster.JobMasternbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; - JobManager successfully registered at ResourceManager, leader
> id: .
>  2020-04-10 19:12:02,919 INFOnbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolnbsp; nbsp;
> nbsp; nbsp; nbsp; - Requesting new slot
> [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}] and profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
>  2020-04-10 19:12:02,920 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> 24691b33c18d7ad73b1f52edb3d68ae4 with allocation id
> AllocationID{5a12237c7f2bd8b1cc760ddcbab5a1c0}.
>  2020-04-10 19:12:02,921 INFOnbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolnbsp; nbsp;
> nbsp; nbsp; nbsp; - Requesting new slot
> [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}] and profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
>  2020-04-10 19:12:02,924 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Requesting new TaskExecutor container with resources
>   2020-04-10 19:12:02,926 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> 24691b33c18d7ad73b1f52edb3d68ae4 with allocation id
> AllocationID{37dd666a18040bf63ffbf2e022b2ea9b}.
>  2020-04-10 19:12:0

Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 Thread tison
FYI

https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamp_extractors.html

IngestionTime 的时间基准是进入 Flink 系统即 Source 算子生产数据时附上的时间戳,你后面有
window/join,这个时间的间隔不是确定性的,大数据量下因为 GC 等原因有不同延迟从而导致 Window 划分不一致是正常的。即使使用
EventTime,在 Watermark
不够可靠的情况下也有可能不一致,足够可靠又可能受到滞后数据的影响损失性能并占用内存。可以看看上面文档的相关内容。

Best,
tison.


tison  于2020年4月15日周三 下午10:18写道:

> IngestionTime 多次运行结果不一样很正常啊,试试 event time?
>
> Best,
> tison.
>
>
> xuefli  于2020年4月15日周三 下午10:10写道:
>
>> 遇到一个非常头痛的问题
>>
>> Flink1.10的集群,用hdfs做backend
>>
>> 一个流aStream准备了10亿的数据,另外一个流bStream百万
>> 如果如下操作
>>
>> 我遇到一个问题 双流Join 
>> 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同
>>  。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后 
>> 再对cStream进行keyBy-->timeWindow-->sum.
>> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
>> 用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
>> 但数据量很大时,就会这样。
>>
>>
>> 每次计算的结果不一样,这个对业务系统挑战巨大
>>
>>
>> 发送自 Windows 10 版邮件应用
>>
>>


Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 Thread tison
IngestionTime 多次运行结果不一样很正常啊,试试 event time?

Best,
tison.


xuefli  于2020年4月15日周三 下午10:10写道:

> 遇到一个非常头痛的问题
>
> Flink1.10的集群,用hdfs做backend
>
> 一个流aStream准备了10亿的数据,另外一个流bStream百万
> 如果如下操作
>
> 我遇到一个问题 双流Join 
> 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同
>  。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后 
> 再对cStream进行keyBy-->timeWindow-->sum.
> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
> 用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
> 但数据量很大时,就会这样。
>
>
> 每次计算的结果不一样,这个对业务系统挑战巨大
>
>
> 发送自 Windows 10 版邮件应用
>
>


Re: 关于flink run -m yarn提交失败。flink1.9

2020-04-14 Thread tison
-yd 参数影响的是你是否使用 perjob 模式提交作业,简单地说

with -yd 以 perjob 模式提交作业,即启动一个新集群
without -yd 提交到一个现有的 Flink on YARN 集群

哪个是你的需求呢?有没有实现用 yarn-session 启动 Flink on YARN 集群呢?

Best,
tison.


guanyq  于2020年4月15日周三 上午8:46写道:

> 提交失败,本人测试与-yd参数有关系,这个参数去掉就可以提交了。但是不知道 -yd这个参数影响了什么?
> At 2020-04-14 15:31:00, "guanyq"  wrote:
> >提交失败,yarn资源也还有很多,为什么会提交失败呢?
> >
> >提交脚本
> >./bin/flink run -m yarn-cluster \
> >-ynm TestDataProcess \
> >-yd \
> >-yn 2 \
> >-ytm 1024 \
> >-yjm 1024 \
> >-c com.data.processing.unconditionalacceptance.TestDataProcess \
> >./tasks/UnconditionalAcceptanceDataProcess.jar \
> >
> >
> >yarn资源
> >Apps Submitted Apps PendingApps RunningApps Completed  Containers
> Running  Memory Used Memory TotalMemory Reserved VCores Used
>  VCores TotalVCores Reserved Active NodesDecommissioned Nodes
> Lost Nodes  Unhealthy Nodes Rebooted Nodes
> >2390   12  227 173 334 GB  1.42 TB 0 B 173
>  288 0   9   0   0   0   0
> >
> >
> >
> >2020-04-14 15:14:19,002 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:19,253 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:19,504 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:19,755 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:20,006 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:20,257 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:20,508 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:20,759 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:21,011 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:21,262 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:21,513 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:21,764 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:22,015 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:22,265 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:22,517 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:22,768 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:23,019 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested re

  1   2   >