FLINK WEEKLY 2019/44

2019-11-04 文章 tison
FLINK WEEKLY 2019/44 <https://zhuanlan.zhihu.com/p/90155442>用户问题

Flink State 过期清除 TTL 问题
<https://lists.apache.org/x/thread.html/639627737fb1ff4e21f405b4c69e95ddb731e1357b2e228f8ccc99b4@%3Cuser-zh.flink.apache.org%3E>

回答包括了相关配置的设置和不同设置对应的清理时机

如何过滤异常的timestamp?
<https://lists.apache.org/x/thread.html/9f3e13aa2e58e64c4de2a2a6d7e0de53f7aec1786e31c68d3bc64894@%3Cuser-zh.flink.apache.org%3E>

EventTime 不可靠的时候采用 IngestionTime 过滤异常值

Flink 的 log 文件夹下产生了 34G 日志
<https://lists.apache.org/x/thread.html/3acc4de2a79f5c8b660659f9a48e23ec4338ebd8bc187b5bf2181988@%3Cuser-zh.flink.apache.org%3E>

重复的错误日志为 BlobServer 失败,可能是由于 Blob 文件被异常删除导致的。由于删除在 FLINK 框架外,FLINK
无限重试获取不存在的 Blob 文件产生了大量的日志

Flink SQL + savepoint
<https://lists.apache.org/x/thread.html/066ece099cc2c2dc8b8a32669229df48166a2a8af40d5aadbae9c23b@%3Cuser.flink.apache.org%3E>

Flink SQL 暂时不支持设置 uid

low performance in running queries
<https://lists.apache.org/x/thread.html/ffea739cf6776bbca520cda30d49ac70939a998e4a5acb953188a8c6@%3Cuser.flink.apache.org%3E>
Flink
1.5+ performance in a Java standalone environment
<https://lists.apache.org/x/thread.html/65ac92dc964c5c1b7a6e0987f300894e31f9033e214d5df197556036@%3Cuser.flink.apache.org%3E>

两个可能的性能问题

RemoteEnvironment cannot execute job from local.
<https://lists.apache.org/x/thread.html/83340a5a7bb6023897090f75efd2a2473f8eb9f82963bf3769709140@%3Cuser.flink.apache.org%3E>

回答介绍了如何使用 RemoteEnvironment 执行 FLINK 作业,主要是作业依赖的 jar 的上传的问题

Sending custom statsd tags
<https://lists.apache.org/x/thread.html/1f1538a86e3f9bc5a9019ead00366c29ade70f7ec5c6d51fbd1b14b9@%3Cuser.flink.apache.org%3E>

StatsD 作为 Metric Reporter 的时候暂时不支持定制化的 tag,需要用户自己扩展定制

[FlinkSQL] is there a way to read/write local json data in Flink SQL like
that of kafka?
<https://lists.apache.org/x/thread.html/72453bedb4fe974ec82a77102527d0148adde5583243bfe4fc0076e5@%3Cuser.flink.apache.org%3E>

FLINK 不支持开箱即用的读写本地 JSON 数据,但是可以通过组合现有功能实现类似的效果

Flink 1.8.1 HDFS 2.6.5 issue
<https://lists.apache.org/x/thread.html/1fa164ec8fee49e739761499c30ca4ea0bcc60310fb4b26f494bfc10@%3Cuser.flink.apache.org%3E>

Hadoop 的 BUG 导致配置 Kerberos+SSL 的时候 CryptoCodec 可能为 null

Checkpoint failed all the time
<https://lists.apache.org/x/thread.html/4c1da5925da7351eb24303d877537464a7178f5afc03bfb806ff862d@%3Cuser-zh.flink.apache.org%3E>

用户作业一直 checkpoint 失败,FLINK 1.9.0 实现了在 checkpoint 失败一定次数的情况下挂掉作业

Testing AggregateFunction() and ProcessWindowFunction() on KeyedDataStream
<https://lists.apache.org/x/thread.html/fb2c178c73f3bd5c6dcc3689fe6a1b88e3951e86620477f388c9bd48@%3Cuser.flink.apache.org%3E>

FLINK 内部测试代码中有丰富的测试套件(Harness)和测试样例可以参考
开发讨论

[ANNOUNCE] Progress of Apache Flink 1.10 #2
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Progress-of-Apache-Flink-1-10-2-td34585.html>

Gary Yao 更新了 FLINK 1.10 中计划内的新特性的开发进度

FLIP-83: Flink End-to-end Performance Testing Framework
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-83%3A+Flink+End-to-end+Performance+Testing+Framework>

Yu Li 发起了 FLIP-83 的讨论,旨在为 FLINK 或者更广泛地说,流计算系统建立一套端到端的性能测试框架

[DISCUSS] FLIP-84: Improve & Refactor API of Table Module
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-84-Improve-amp-Refactor-API-of-Table-Module-td34537.html>

Terry Wang 发起了 FLIP-84 的讨论,旨在重构部分设计不良的 Table API

[DISCUSS] Semantic and implementation of per-job mode
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Semantic-and-implementation-of-per-job-mode-td34502.html#a34520>

tison 发起了关于 FLINK 中 per-job 模式语义的讨论。这主要是在实现 Flink on k8s 中重新审视 FLINK
的语义发现的问题,FLINK 目前的 per-job
模式将集群管理、作业管理和阻塞/非阻塞作业执行混杂在一起,缺乏一个明确的概念划分和配置。如果你使用了 FLINK per-job
模式并且喜欢它的某些行为或者认为某些行为出乎你的意料,欢迎回复这个邮件列表表达你的看法

[DISCUSS] Move flink-orc to flink-formats from flink-connectors
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Move-flink-orc-to-flink-formats-from-flink-connectors-td34438.html>

Jingsong Lee 发起了关于将 flink-orc 模块移动到 flink-formats 的讨论,这主要是为了正确的分类
社区发展

[ANNOUNCE] Becket Qin joins the Flink PMC
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Becket-Qin-joins-the-Flink-PMC-td34400.html>

Becket Qin 成为了 FLINK 的 PMC

FLINK FORWARD: THE KEY TAKEAWAYS
<https://research.euranova.eu/flink-forward-the-key-takeaways/>

Euroa Nova 撰写了 Flink Forward Europe 的总结

Streaming ETL With Apache Flink - Part 1
<https://dzone.com/articles/introduction-to-streaming-etl-with-apache-flink>

Preetdeep Kumar 撰写了一篇使用 FLINK 做流 ETL 的文章


Re: Mac操作系统下Ask timed out问題

2019-11-04 文章 tison
这个问题其实还挺常见的,问题有很多种可能。比如你看一下 log 下面 cluster 的日志看看
Dispatcher 有没有正常的起起来,flink-conf 你有没有改过导致超时过短(比如 1 ms)或者
资源不够。也有升级 jdk 小版本后就不复现的。

Best,
tison.


jeff kit  于2019年11月5日周二 下午2:43写道:

> 你好。
> 我本地的Flink是官网提供的Binary包,非自己编译的。
> 我相信我的情况是少数,绝大多数人的Mac都是能跑的。
>
> On Tue, Nov 5, 2019 at 2:24 PM Biao Liu  wrote:
>
> > 你好,
> >
> > MacOS 可以跑 Flink,我自己刚试了下,复制你的命令就可以跑。
> > 建议再查一下你本地的环境,你本地的 Flink 是自己编译的吗?如果不行试一下 Flink 提供的 binary 包 [1]?
> >
> > [1] https://flink.apache.org/downloads.html
> >
> > Thanks,
> > Biao /'bɪ.aʊ/
> >
> >
> >
> > On Tue, 5 Nov 2019 at 12:30, jeff kit  wrote:
> >
> > > HI,大家好:
> > > 我在运行Flink官方的Quick
> > > Start就遇到了问題。为了避免自己问蠢问題,我先做了很多尝试,如换Flink的版本,从1.7到1.8及至1.9都试过,在我自己的Mac OS
> > > X上这个问題是必然出现的,而换到其他操作系统例如Windows,则是正常的。
> > >
> > > 这也许不是一个常见的问題,更多是我本机的运行环境问題,但多天尝试下来仍然没有找到解决方法,才在这里求助一下。
> > >
> > > 操作步骤:
> > > 1. ./bin/start-cluster.sh  # 启动flink。
> > > 2. ./bin/flink run examples/batch/WordCount.jar   # 提交wordCount 包
> > >
> > > 随后就是抛了异常:
> > > Starting execution of program
> > > Executing WordCount example with default input data set.
> > > Use --input to specify file input.
> > > Printing result to stdout. Use --output to specify output path.
> > >
> > > 
> > >  The program finished with the following exception:
> > >
> > > org.apache.flink.client.program.ProgramInvocationException: Could not
> > > retrieve the execution result. (JobID:
> 81bc8720dee57710788cc8e41079ba4d)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
> > > at
> > >
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> > > at
> > >
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> > > at
> > >
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
> > > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
> > > at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
> > > at
> > >
> >
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:88)
> > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > at
> > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > > at
> > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > at java.lang.reflect.Method.invoke(Method.java:483)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> > > at
> > >
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> > > at
> > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> > > at
> > >
> >
> org.apache.flink.client.cli.CliFrontend$$Lambda$31/1990451863.call(Unknown
> > > Source)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> > > Caused by: org.apache.flink.runtime.client.JobSubmissionException:
> Failed
> > > to submit JobGraph.
> > > at
> > >
> > >
> >
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
> > > at
> > >
> > >
> >

Re: Flink DataStream KeyedStream 与 AggregateFunction

2019-11-11 文章 tison
关于第二点,BLINK 和腾讯的 FLINK 定制版本 TDFLINK 都做了 local keyby 功能,
社区相关的讨论见 FLIP-44,可惜社区一直没啥时间跟进这个 Thread

Best,
tison.


Px New <15701181132mr@gmail.com> 于2019年11月10日周日 上午10:58写道:

> [image: image.png]建议深入解下 keyWindow,NoKeyWindow 与Assigner TimeWindow
> And WindowsFunction 
>
> Yuan,Youjun  于2019年11月9日周六 下午7:46写道:
>
>> 1, 是
>> 2,没有标准答案,是否可以本地先聚合?
>> 3,AggFunc是指定做何种聚合,是sum, 还是avg, 还是count。不指定的话,Flink哪里指导你要计算啥?
>>
>> -邮件原件-
>> 发件人: 王佩 
>> 发送时间: Saturday, November 9, 2019 11:45 AM
>> 收件人: user-zh 
>> 主题: Flink DataStream KeyedStream 与 AggregateFunction
>>
>> 请教下:
>>
>> 1、DataStream 如按用户ID KeyBy后,同一个用户ID的数据最终会被分到一个Partition中吗?
>>
>> 2、假设1成立,这样就会有数据倾斜的问题。该如何解决?
>>
>> 3、假设1成立,如: DataStream
>>.keyBy(userID)
>>.timeWindow()
>>.aggregate(new
>> AggregateFunction(...)),这里的AggregateFunction
>> 为啥还需要merge呢。因为同一个Key的数据只会在同一个Partition中被计算,觉得不需要merge啊。
>>
>> 这三个问题有点疑惑,大神们帮忙看下!
>> 感谢!
>>
>


FLINK WEEKLY 2019/45

2019-11-11 文章 tison
FLINK WEEKLY 2019/45 用户问题

   -

   FLINK 集群搭建的综合问题
   
:
   社区新用户提出了搭建 FLINK 集群的一系列问题,包括监控、作业上下线和恢复相关的问题。有业务接入 FLINK 需求的朋友可以跟踪一下这个
   Thread
   -

   FLINK 作业提交到集群执行异常
   
:
   Kafka 依赖相关的打包问题,表现为提交不报错,运行时 ClassNotFoundException。这与 FLINK 的类加载机制有关
   -

   Flink DataStream KeyedStream 与 AggregateFunction
   
:
   DataStream 上 keyBy 和 aggregate 语义和使用上的问题
   -

   Flink同一DAG里两个并行动态流的问题
   
:
   具体业务发现两个并行动态流性能相互影响的性能问题
   -

   Mac操作系统下Ask timed out问題
   
:
   神奇的升级 JDK 8 小版本之后就解决了,这里充满了玄学,我在这个 Thread 的回复里回忆了一下之前看到这个问题的各种原因和解法
   -

   在 Trigger里可以知道 Window中数据的状况吗
   

   -

   Fwd: 广播状态是否可以设置ttl过期时间
   

   -

   Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
   

   -

   如何让Flink trigger只输出有变化的数据?
   

   -

   [metrics] metrics 中 Availability 和 Checkpointing 这两组没有显示
   

   -

   flink SQL UpsertTable 语义问题
   

   -

   Flink savepoint(checkpoint) recovery dev debug
   
:
   回答介绍了相关的 debug 方法
   -

   Till Rohrmann - Can you please share your code for FF - SF - Flink as a
   lib
   
:
   一个 Flink on k8s 的 demo
   -

   How can I get the backpressure signals inside my function or operator?
   

   -

   Limit max cpu usage per TaskManager
   

   -

   flink on yarn-cluster kerberos authentication for hbase
   

   -

   What metrics can I see the root cause of "Buffer pool is destroyed"
   message?
   

   -

   PreAggregate operator with timeout trigger
   

   -

   RocksDB and local file system
   
:
   RocksDB 作为 StateBackend 的时候保存 RocksDB 数据库的位置应该是 HDFS 或者其他分布式文件系统。技术上说可以使用
   local 文件,但是在恢复的时候万一(很有可能)Task 发给的不是刚才执行的机器,那么通过 file://.. 的路径就找不到
   RocksDB 的数据,也就无法从 checkpoint 中恢复
   -

   Flink Filters have state?
   
:
   关于编写 Stateful 应用的一些基本问题

已知缺陷

   -

   Incorrect handling of FLINK_PLUGINS_DIR on Yarn
   : 1.9+ 的 plugin 加载机制在
   FLINK on YARN 上目前因为一个实现 BUG 是无效的

开发讨论

   -

   [DISCUSS] PyFlink User-Defined Function Resource Management
   

   -

   [DISCUSS] Flink Avro Cloudera Registry (FLINK-14577)
   


社区发展

   -

   [ANNOUNCE] Jark Wu is now part of the Flink PMC
   

Re: yarn-session.sh 启动 报错

2019-11-15 文章 tison
图看不见,你可以外链图床或者 gist 贴 log

Best,
tison.


李军  于2019年11月15日周五 下午4:07写道:

> 这个报错实在不知道是什么意思,哪个地方连接不上
>
>


Re: yarn-session.sh 启动 报错

2019-11-18 文章 tison
可能是你没有设置 HADOOP_CLASSPATH

参考 https://flink.apache.org/downloads.html 本页面最上部分

If you plan to use Apache Flink together with Apache Hadoop (run Flink on
YARN, connect to HDFS, connect to HBase, or use some Hadoop-based file
system connector) then select the download that bundles the matching Hadoop
version, download the optional pre-bundled Hadoop that matches your version
and place it in the lib folder of Flink, or export your HADOOP_CLASSPATH
<https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/hadoop.html>

Best,
tison.


李军  于2019年11月18日周一 下午4:31写道:

> 启动命令: ./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024
> 报错:Error: Could not find or load main class
> org.apache.flink.yarn.cli.FlinkYarnSessionCli
>
>
> ResourceManager,NodeManager都已启动。


FLINK WEEKLY 2019/46

2019-11-17 文章 tison
FLINK WEEKLY 2019/46 用户问题

   -

   关于从savepoint启动作业报错 migration for MapState currently isn't supported.
   
:
   1.10 中支持了 RocksDB StateBackend 的 MapState 演化
   -

   flink里删除cassandra的记录
   

   -

   Queryable State 查询反序列化问题
   

   -

   flink ScalarFunction 重写 getParameterTypes 方法不生效
   

   -

   编写 keyed raw state 的例子
   

   -

   手动从 checkpoint 中恢复的方法
   

   -

   Operator name and uuid best practices
   

   -

   how to setup a ha flink cluster on k8s?
   

   -

   How to unsubscribe the Apache projects and jira issues notification
   
:
   取消订阅的方式是发送任意邮件到 unsubscribe-@.apache.org
   -

   Checkpoint 完成速度慢的问题
   
:
   可能是由于 back pressure 导致的
   -

   Initialization of broadcast state before processing main stream
   

   -

   Flink-JDBC JDBCUpsertTableSink keyFields Problem
   

   -

   Propagating event time field from nested query
   

   -

   Monitor rocksDB memory usage
   


已知缺陷

   -

   Starting a TaskExecutor blocks the YarnResourceManager's main thread
   : 目前,YARN 上启动 FLINK
   集群可能由于同步申请 slot 的耗时而阻塞 FLINK RM 相当长的一段时间,这会导致 FLINK RM 和 TM 心跳超时。Yang Wang
   采取的修复是异步的申请 slot 以保证 FLINK RM 的可用性。目前 PR 正在 review 中,预计将会在 1.8.3 1.9.2 和
   1.10.0 中上线
   -

   Pyflink building failure in master and 1.9.0 version
   : Windows 上的 PyFlink
   编译不通过,这是因为路径配置采用了操作系统敏感的方法的原因

开发讨论

   -

   [DISCUSS] FLIP-27: Refactor Source Interface
   
:
   Source 接口的重构正在开发中,有可能在 1.10 里包括它的一个初步实现
   -

   [DISCUSS] FLIP-86: Improve Connector Properties
   
:
   Jark Wu 的 FLIP-86 旨在提升 Table API/SQL 连接器的配置格式
   -

   [[DISCUSSION] Kafka Metrics Reporter]: Gyula Fóra 提出了在 FLINK 中加入 Kafka
   实现的 Metrics Report 的讨论

社区发展

   -

   [DISCUSS] Releasing Flink 1.8.3
   
:
   来自阿里的工程师 Hequn Cheng 成为了 1.8.3 的 Release Manager
   -

   [DISCUSS] Release flink-shaded 9.0
   
:
   Chesnay Schepler 发起了发布 flink-shaded 9.0 的讨论


Re: yarn-session.sh 启动 报错

2019-11-18 文章 tison
你可以改一下 yarn-session.sh 的内容,看一下最后执行的命令是啥,可能 shell 里多了空格导致一些解析上的问题。

Best,
tison.


李军  于2019年11月18日周一 下午4:44写道:

> 找到问题了;
> 是我这个包:flink-shaded-hadoop-2-uber-2.8.3-7.0 (1)  名字错了;
> 但是又有一个错误不是很明白;
>
>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
> 在2019年11月18日 16:38,李军  写道:
>
> HADOOP_CLASSPATH 设置了;
> 启动这个好像不需要指定程序的入口类吧
>
>
> 在2019年11月18日 16:34,tison  写道:
>
> 可能是你没有设置 HADOOP_CLASSPATH
>
> 参考 https://flink.apache.org/downloads.html 本页面最上部分
>
> If you plan to use Apache Flink together with Apache Hadoop (run Flink on
> YARN, connect to HDFS, connect to HBase, or use some Hadoop-based file
> system connector) then select the download that bundles the matching Hadoop
> version, download the optional pre-bundled Hadoop that matches your version
> and place it in the lib folder of Flink, or export your HADOOP_CLASSPATH
> <
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/hadoop.html
> >
>
> Best,
> tison.
>
>
> 李军  于2019年11月18日周一 下午4:31写道:
>
> 启动命令: ./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024
> 报错:Error: Could not find or load main class
> org.apache.flink.yarn.cli.FlinkYarnSessionCli
>
>
> ResourceManager,NodeManager都已启动。
>
>


Re: ./bin/yarn-session.sh 启动; Error while running the Flink Yarn session. 错误

2019-11-18 文章 tison
你的那个图可能是因为一些参数传得不对导致 YARN 部署的时候出问题了。我之前遇到过这种情况是因为我把一个参数传错了 YARN 的 APP
起来之后抛异常

你看到在那边不停的 retry 是一个已知的 BUG,我记得最近几个版本已经修了,我找找对应的 JIRA

Best,
tison.


李军  于2019年11月18日周一 下午4:50写道:

> ./bin/yarn-session.sh 启动; Error while running the Flink Yarn session. 错误
> 报错信息图: https://blog.csdn.net/qq_37518574/article/details/103125368
>
>


FLINK WEEKLY 2019/48

2019-12-06 文章 tison
FLINK WEEKLY 2019/48 
感谢社区同学 forideal 负责编写本期 FLINK WEEKLY!

用户问题

   - 如何成为flink的contributor
   

用户可以直接在感兴趣的jira下面回复,社区可能会把这个assign给用户
   - why operator not chained?
   

   - Uid and name for Flink sources and sinks
   

用户设置了operatoruid和name,并针对用户自己的情况提了一些问题,邮件列表讨论了这些问题
   - HBase ScannerTimeoutException and double Result processing
   

用户在使用flink读取hbase的数据的时候发生了超时异常,邮件列表从源码层面分析出现这个情况的原因
   - Dynamically creating new Task Managers in YARN
   
用户想按需动态的申请taskmanager,邮件列表讨论了
   Flink on Yarn 的session mode和job mode,并针对问题给出了一些建议
   - Per Operator State Monitoring
   

用户的job被kill了,用户猜测是state导致。邮件列表讨论了某些可能监控的方法,同时讨论了有可能是
   state backend为RocksDB导致
   - Apache Flink - Throttling stream flow
   

邮件列表讨论了flink
   source限流的问题
   - Read multiline JSON/XML
   

用户询问在Flink中是否存在像spark读取多行json/xml的api,邮件列表讨论了一些方法
   - Apache Flink - Troubleshooting exception while starting the job from
   savepoint
   

用户有一个Job,在没有任何修改的前提下,无法使用
   save point重启。经过邮件讨论,用户发现有一个有状态的operator没有被分配name和uid。详细问题排查过程,参考邮件列表。
   - How to recover state from savepoint on embedded mode?
   

在flink
   embedded mode下,用户想从save point中恢复job。邮件列表讨论了可能可行的办法。有兴趣的可以去看看
   - Metrics for Task States
   


开发讨论

   - [PROPOSAL/SURVEY] Enable background cleanup for state with TTL by
   default
   

   - [DISCUSS] Disable conversion between TIMESTAMP and Long in parameters
   and results of UDXs
   

   - [DISCUSS] Support JSON functions in Flink SQL
   

   - [DISCUSS] Releasing Flink 1.8.3
   

Flink
   1.8.3 即将发版了
   - set job level TTL
   


社区发展

   - Apache Flink-shaded 9.0 released
   

   - Flink Forward Asia 2019
   

2019年11月28/29
   Flink Forward Asia 2019 在北京召开
   -


Re: flink 1.9.1 guava version conflict

2019-11-18 文章 tison
可以直接使用 flink-cep 模块吗?

如果是自己定制的 flink lib,可以提供更详细的打包过程和作出的改动吗?

这里就是不同的 guava 版本没 shaded 好导致 classloader 解析的时候出现冲突,这个问题是 adhoc
的,需要进一步的了解你【我是下载了源码之后,自己编译了flink
cep相关的jar然后引入进来。】这个过程是怎么做的。

Best,
tison.


孙森  于2019年11月19日周二 下午2:23写道:

> 补充
>
> 我是下载了源码之后,自己编译了flink
> cep相关的jar然后引入进来。看pom文件flink-shaded-guava是flink-core引入的。
>
> > 2019年11月19日 下午2:20,孙森  写道:
> >
> > 我在项目中使用flink release-1.9.1,一直出现Caused by:
> java.lang.NumberFormatException: Not a version: 9
> > java -version
> > java version "1.8.0_111"
> > Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
> > Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
> >
> > scala 2.11
> >
> > maven  3.6.1
> >
> > 具体的错误信息如下:
> > Exception in thread "main"
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> java.lang.NumberFormatException: Not a version: 9
> >   at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> >   at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> >   at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> >   at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
> >   at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
> >   at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)
> >
> >   at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
> >   at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
> >   at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:250)
> >   at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
> >   at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
> >   at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> >   at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
> >
> >   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> >   at
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> >   at scala.App$$anonfun$main$1.apply(App.scala:76)
> >   at scala.App$$anonfun$main$1.apply(App.scala:76)
> >   at scala.collection.immutable.List.foreach(List.scala:381)
> >   at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> >   at scala.App$class.main(App.scala:76)
> >
> > Caused by: java.lang.NumberFormatException: Not a version: 9
> >   at scala.util.PropertiesTrait$class.parts$1(Properties.scala:184)
> >   at
> scala.util.PropertiesTrait$class.isJavaAtLeast(Properties.scala:187)
> >   at scala.util.Properties$.isJavaAtLeast(Properties.scala:17)
> >   at
> scala.tools.util.PathResolverBase$Calculated$.javaBootClasspath(PathResolver.scala:276)
> >   at
> scala.tools.util.PathResolverBase$Calculated$.basis(PathResolver.scala:283)
> >   at
> scala.tools.util.PathResolverBase$Calculated$.containers$lzycompute(PathResolver.scala:293)
> >   at
> scala.tools.util.PathResolverBase$Calculated$.containers(PathResolver.scala:293)
> >   at
> scala.tools.util.PathResolverBase.containers(PathResolver.scala:309)
> >   at
> scala.tools.util.PathResolver.computeResult(PathResolver.scala:341)
> >   at
> scala.tools.util.PathResolver.computeResult(PathResolver.scala:332)
> >   at scala.tools.util.PathResolverBase.result(PathResolver.scala:314)
> >   at
> scala.tools.nsc.backend.JavaPlatform$class.classPath(JavaPlatform.scala:28)
> >   at
> scala.tools.nsc.Global$GlobalPlatform.classPath(Global.scala:115)
> >   at
> scala.tools.nsc.Global.scala$tools$nsc$Global$$recursiveClassPath(Global.scala:131)
> >   at scala.tools.nsc.Global.classPath(Global.scala:128)
> >   at
> scala.tools.nsc.backend.jvm.BTypesFromSymbols.(BTypesFromSymbols.scala:39)
> >   at
> scala.tools.nsc.backend.jvm.BCodeIdiomatic.(BCodeIdiomatic.scala:24)
> >   at
> scala.tools.nsc.backend.jvm.BCodeHelpers.(BCodeHelpers.scala:23

FLINK WEEKLY 2019/47

2019-11-24 文章 tison
FLINK WEEKLY 2019/47 用户问题

   -

   关于flink1.7.2checkpoint本地文件存储,设置文件个数的问题
   
:
   文件个数是 FLINK 内部逻辑,不可配置也无需配置
   -

   Cron style for checkpoint
   
:
   社区成员想要一个类似于 CRON 任务的后台定期 trigger checkpoint 的机制,邮件列表讨论了其合理性和现有方案解决实际需求的一些方式
   -

   Metrics for Task States
   
:
   社区成员在构建 FLINK 作业管理平台时对 TM 的监控指标提出了新的需求,想要获得一个 JOB 不同 TASK 处于什么状态的指标
   -

   Uid and name for Flink sources and sinks
   

   -

   Does apache flink support stream input from Postgresql?
   
:
   可以使用 JDBC connector
   -

   how to setup a ha flink cluster on k8s?
   

   -

   Flink configuration at runtime
   
:
   有些 JOB 级别的配置在最近的改动中能够在运行时提交前动态确定。但是大多数的配置还是直接从 flink-conf.yaml 中读取,不可改变
   -

   Dynamically creating new Task Managers in YARN
   
:
   FLIP-6 之后的 FLINK on YARN 已经是动态申请资源了

开发讨论

   -

   [DISCUSS] Support configure remote flink jar
   
:
   我发起了关于在 YARN 上部署作业时运行时依赖例如 FLINK 框架 JAR
   从远端直接拉取的讨论。这一讨论可以延伸到任意远端依赖上,主要想解决的问题是目前 FLINK
   在部署作业时总是要部署大量的用户依赖,这些依赖可能有很多共同的内容,却在每次部署是都需要重新上传和本地化。讨论着重在利用 YARN
   的本地化机制来优化这一开销,K8S 上有类似的初始化机制可以优化
   -

   FLINK 1.8.3 即将发布
   -

   FLINK shaded 9.0 已经发布
   -

   FLINK pulsar connector 工作
   
由
   Yijie Shen 稳步推进中
   -

   [DISCUSS] Remove old WebUI
   
:
   1.9 中引进的新版 WebUI 稳定运行中,Chesnay 提出移除旧版的 WebUI
   -

   [DISCUSS] Primary keys in Table API
   
:
   Dawid 的 FLIP-87 旨在向 TableAPI 中引入主键的概念以助于运行时优化

社区发展

   -

   [ANNOUNCE] Launch of flink-packages.org: A website to foster the Flink
   Ecosystem
   
:
   FLINK 也有了自己的生态系统网站 啦~


Re: Re: flink on yarn 指定节点开启 yarn session 报错

2019-11-26 文章 tison
确实是 historyserver.web.port 的默认值,但我记得 HistoryServer 默认是不启动的

总之可以确认下问题能否稳定复现,失败的瞬间端口有无被占用,以及换个端口能不能起来

如果还不行再找找其他原因

Best,
tison.


Yangze Guo  于2019年11月27日周三 上午11:23写道:

> 8082我记得是historyserver.web.port的默认值,很可能是冲突了改成8081呢?
>
> Best,
> Yangze Guo
>
> On Wed, Nov 27, 2019 at 11:13 AM 杨浩程  wrote:
> >
> >
> 
> > 好的,更改过的配置如下:
> >
> >
> #==
> > # Rest & web frontend
> >
> #==
> >
> >
> > # The port to which the REST client connects to. If rest.bind-port has
> > # not been specified, then the server will bind to this port as well.
> > #
> > rest.port: 8082
> >
> >
> > # The address to which the REST client will connect to
> > #
> > rest.address: bigdata-test-8
> >
> >
> > # Port range for the REST and web server to bind to.
> > #
> > #rest.bind-port: 8080-8090
> > rest.bind-port: 8082
> >
> >
> > # The address that the REST & web server binds to
> > #
> > rest.bind-address: bigdata-test-8
> >
> >
> > # Flag to specify whether job submission is enabled from the web-based
> > # runtime monitor. Uncomment to disable.
> >
> >
> > #web.submit.enable: false
> > #web.upload.dir: /data/flink-1.9.0/upload_jars/
> >
> 
> >
> >
> >
> >
> >
> > 在 2019-11-27 11:01:47,"Yangze Guo"  写道:
> > >您好,目前ML不支持图片,能将配置贴上来么?
> > >另外看报错信息像是8082端口被占用导致rest服务起不起来
> > >
> > >Best,
> > >Yangze Guo
> > >
> > >
> > >On Wed, Nov 27, 2019 at 10:56 AM 杨浩程  wrote:
> > >
> > >> 各位大佬好!
> > >>请教各位个问题:使用的flink 版本1.9.0。测试flink on yarn 指定节点 开启yarn session
> 会话。
> > >>希望yarnsession开启的集群 jobmanager开启在我指定的节点上。
> > >>更改的配置如下:
> > >> 报错信息如下:
> > >>
> > >>
> ===
> > >> 2019-11-27 10:35:45,640 INFO
> > >>
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
> > >> - backgroundOperati
> > >> 2019-11-27 10:35:45,650 INFO
> > >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  -
> > >> Session: 0x36ea133e2b51435 closed
> > >> 2019-11-27 10:35:45,650 INFO
> > >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
> > >> EventThread shut down for session: 0x3
> > >> 2019-11-27 10:35:45,651 INFO
> > >> org.apache.flink.runtime.rpc.akka.AkkaRpcService  -
> Stopping
> > >> Akka RPC service.
> > >> 2019-11-27 10:35:45,659 INFO
> > >> org.apache.flink.runtime.rpc.akka.AkkaRpcService  -
> Stopping
> > >> Akka RPC service.
> > >> 2019-11-27 10:35:45,666 INFO
> > >> akka.remote.RemoteActorRefProvider$RemotingTerminator -
> Shutting
> > >> down remote daemon.
> > >> 2019-11-27 10:35:45,669 INFO
> > >> akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote
> > >> daemon shut down; proceeding with fl
> > >> 2019-11-27 10:35:45,676 INFO
> > >> akka.remote.RemoteActorRefProvider$RemotingTerminator -
> Shutting
> > >> down remote daemon.
> > >> 2019-11-27 10:35:45,677 INFO
> > >> akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote
> > >> daemon shut down; proceeding with fl
> > >> 2019-11-27 10:35:45,696 INFO
> > >> akka.remote.RemoteActorRefProvider$RemotingTerminator -
> Remoting
> > >> shut down.
> > >> 2019-11-27 10:35:45,696 INFO
> > >> akka.remote.RemoteActorRefProvider$RemotingTerminator -
> Remoting
> > >> shut down.
> > >> 2019-11-27 10:35:45,709 INFO
> > >> org.apache.flink.runtime.rpc.akka.AkkaRpcService  -
> Stopped
> > >> Akka RPC service.
> > >> 2019-11-27 10:35:45,713 INFO
> > >> org.apache.flink.runtime.rpc.akka.AkkaRpcService  -
> Stopped
> > >> Akka RPC service.
> > >> 2019-11-27 10:35:45,714 ERROR
> > >> org.a

Re: 本地checkpoint 文件190G了

2019-11-29 文章 tison
retain 20 干啥...默认是 1 基本也还行,我还没见过其他生产上超过 5 的...

你的 checkpoint interval 是多少(这个不影响最终大小但是可能很快你就看到 checkpoint 文件大小涨上去),然后单次
checkpoint 大小多大知道吗?

听上去像是预期行为因为你配置就是要保留很多的 checkpoint,实际上自动恢复的时候只会读最后一个

Best,
tison.


sun <1392427...@qq.com> 于2019年11月30日周六 上午10:33写道:

> retain 配置的20,还需要在程序里面配置什么吗,increase已经开启了,请问程序要怎么配置 -- 原始邮件
> ------
> 发件人: "tison"
> 发送时间: 2019年11月30日(星期六) 上午10:31
> 收件人: "user-zh";
> 主题: Re: 本地checkpoint 文件190G了
>
>
> 你的 retain 数量设置是多少,然后程序里 state 本身写了多大的 state,有没开启 incremental checkpoint
>
> Best,
> tison.
>
>
> sun <1392427...@qq.com> 于2019年11月30日周六 上午10:28写道:
>
> > 花了两天时间-- 原始邮 --
> > 发人: "Henry"
> > 发送时间: 2019年11月30日(星期六) 上午10:25
> > 收人: "user-zh";
> > 主题: Re:本地checkpoint 文190G了
> >
> >
> >
> > 是不是很快就增长到那么多了?
> >
> >
> >
> >
> >
> > 在 2019-11-30 10:13:27,"sun" <1392427...@qq.com> 写道:
> >
> > 求助,我的文夹一直在长大
> >
> >
> >
> >
> > 发自我的iPhone


Re: 本地checkpoint 文件190G了

2019-11-29 文章 tison
你的 retain 数量设置是多少,然后程序里 state 本身写了多大的 state,有没开启 incremental checkpoint

Best,
tison.


sun <1392427...@qq.com> 于2019年11月30日周六 上午10:28写道:

> 花了两天时间-- 原始邮件 --
> 发件人: "Henry"
> 发送时间: 2019年11月30日(星期六) 上午10:25
> 收件人: "user-zh";
> 主题: Re:本地checkpoint 文件190G了
>
>
>
> 是不是很快就增长到那么多了?
>
>
>
>
>
> 在 2019-11-30 10:13:27,"sun" <1392427...@qq.com> 写道:
>
> 求助,我的文件夹一直在长大
>
>
>
>
> 发自我的iPhone


Re: 回复: 本地checkpoint 文件190G了

2019-11-29 文章 tison
retain 调小是你的场景比较简单的方法,1 2 3 都行,你可以试试...

Best,
tison.


sun <1392427...@qq.com> 于2019年11月30日周六 下午2:08写道:

> 好的,我主要想知道,怎么定时清理那些我用不到的checkpoint 文件,怎么让我的本地checkpoint
> 不会一直长大-- 原始邮件 --
> 发件人: "tangjunli...@huitongjy.com"
> 发送时间: 2019年11月30日(星期六) 下午2:06
> 收件人: "user-zh";
> 主题: 回复: 回复: 本地checkpoint 文件190G了
>
>
> 如果处理数据没有达到一定量级,建议state.backend.incremental 设为false
>
>
>
> tangjunli...@huitongjy.com
>
> 发人: sun
> 发送时间: 2019-11-30 14:05
> 收人: user-zh
> 主题: 回复: 本地checkpoint 文190G了
> rocksdb ,设置的true-- 原始邮 --
> 发人: "tangjunli...@huitongjy.com"
> 发送时间: 2019年11月30日(星期六) 下午2:03
> 收人: "user-zh";
> 主题: 回复: 本地checkpoint 文190G了
>
>
> 用的什么backend? state.backend.incremental  这个参数设置的什么?
>
>
>
> tangjunli...@huitongjy.com
> 发人: sun
> 发送时间: 2019-11-30 10:13
> 收人: flink; user-zh-subscribe
> 主题: 本地checkpoint 文190G了
> 求助,我的文夹一直在长大
>
>
>
>
> 发自我的iPhone


Re: 回复:Kafka库和Flink的反向类加载方法不兼容

2019-11-23 文章 tison
最近我在梳理这部分的代码,刚好看到这个邮件就敦促这写了这篇小短文 https://zhuanlan.zhihu.com/p/93374622
简要介绍了我理解的
FLINK 依赖上传和加载的过程

过程中发现官方文档中有一节
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
 已经把很多设计原则和坑都列出来了,不妨一看

Best,
tison.


巫旭阳  于2019年11月23日周六 下午9:19写道:

> 感谢回答
>  出现这个问题也是因为我把flink connect 的jar包打成lib 放在flink 和 hadoop
> 的classpath下面(缩小打出来的应用程序包),出现这个报错。我大概理解这个class加载的意思了。
> 在 2019-11-23 17:16:51,"1193216154" <1193216...@qq.com> 写道:
> >parent first就是先加载环境的jar包,再加载用户的jar包(就是自己写的flink程序),children
> first就是反过来。flink默认配置是chikdren
> first,建议不要动这个配置。而是检查一下自己的flink程序的pom依赖和flink lib下面的jar包有没有冲突
> >
> >
> >
> >
> >
> >---原始邮件---
> >发件人: "aven.wu" >发送时间: 2019年11月23日(星期六) 下午4:57
> >收件人: "user-zh@flink.apache.org" >主题: Kafka库和Flink的反向类加载方法不兼容
> >
> >
> >报错如下
> >cannot assign instance of org.apache.commons.collections.map.LinkedMap to
> field
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
> of type org.apache.commons.collections.map.LinkedMap in instance of
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
> >
> >修改flink-conf.yaml
> >classloader.resolve-order: parent-first
> >
> >哪位大佬能解释一下这个反向类加载是什么意思?
> >
> >发送自Aven.wu
>


Re: CliFrontend 未优先加载用户jar包中的class

2020-03-03 文章 tison
也是一种 hack 的方法,不过社区肯定不能在 master 上这么搞就是了(x

Best,
tison.


aven.wu  于2020年3月3日周二 下午4:44写道:

> 感谢回答
> 后来我查了Flink run脚本的classpath设置,我修改了脚本将我的jar包指定在flink classpath的最前面得以解决问题
>
> Best
> Aven
>
> 发件人: tison
> 发送时间: 2020年3月3日 14:16
> 收件人: user-zh
> 主题: Re: CliFrontend 未优先加载用户jar包中的class
>
>
> https://github.com/apache/flink/commit/0f30c263eebd2fc3ecbeae69a4ce9477e1d5d774
>
> Best,
> tison.
>
>
> tison  于2020年3月3日周二 下午2:13写道:
>
> > 1.9.2 和 1.10 上已经修复此问题,修改可参考
> >
> > https://issues.apache.org/jira/browse/FLINK-13749
> >
> > Best,
> > tison.
> >
> >
> > aven.wu  于2020年3月3日周二 下午2:04写道:
> >
> >> 组件版本 Hadoop 2.7.3,flink 1.9.1 ,elasticsearch6.5。
> >> 该问题的起源是因为程序我的用户程序用Jackson,并依赖了Elasticsearch rest client
> >> ,在Yarn集群上提交任务的时候出现了如下异常:
> >> java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
> >> at
> >>
> org.elasticsearch.common.xcontent.json.JsonXContent.(JsonXContent.java:57)
> >> 后上网查询后推论有可能是jackson版本问题,于是打印了类加载路径:
> >> --main class jackson class load before
> >> run--
> >> file:/usr/**/hadoop/lib/jackson-databind-2.2.3.jar
> >> 果然是从hadoop的classpath下加载了2.2.3版本
> >>
> >> 之后查看flink run命令入口程序
> >> CliFrontend#bulidProgram line 799
> >> PackagedProgram#PackagedProgram line 221
> >> JobWithJars#BuildUserCodeClassLoad line 142
> >> return FlinkUserCodeClassLoaders.parentFirst(urls, parent);
> >> 默认使用parentFirst,根据官方文档里面描述的反向类加载,应该是首先从classpath下加载Class,而不是从user
> >> jar包中加载类。
> >> 请问如何修改此处的类加载顺序,优先从user jar 中加载class
> >>
> >> Best
> >> Aven
> >>
> >>
>
>


Re: CliFrontend 未优先加载用户jar包中的class

2020-03-02 文章 tison
1.9.2 和 1.10 上已经修复此问题,修改可参考

https://issues.apache.org/jira/browse/FLINK-13749

Best,
tison.


aven.wu  于2020年3月3日周二 下午2:04写道:

> 组件版本 Hadoop 2.7.3,flink 1.9.1 ,elasticsearch6.5。
> 该问题的起源是因为程序我的用户程序用Jackson,并依赖了Elasticsearch rest client
> ,在Yarn集群上提交任务的时候出现了如下异常:
> java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
> at
> org.elasticsearch.common.xcontent.json.JsonXContent.(JsonXContent.java:57)
> 后上网查询后推论有可能是jackson版本问题,于是打印了类加载路径:
> --main class jackson class load before
> run--
> file:/usr/**/hadoop/lib/jackson-databind-2.2.3.jar
> 果然是从hadoop的classpath下加载了2.2.3版本
>
> 之后查看flink run命令入口程序
> CliFrontend#bulidProgram line 799
> PackagedProgram#PackagedProgram line 221
> JobWithJars#BuildUserCodeClassLoad line 142
> return FlinkUserCodeClassLoaders.parentFirst(urls, parent);
> 默认使用parentFirst,根据官方文档里面描述的反向类加载,应该是首先从classpath下加载Class,而不是从user jar包中加载类。
> 请问如何修改此处的类加载顺序,优先从user jar 中加载class
>
> Best
> Aven
>
>


Re: CliFrontend 未优先加载用户jar包中的class

2020-03-02 文章 tison
https://github.com/apache/flink/commit/0f30c263eebd2fc3ecbeae69a4ce9477e1d5d774

Best,
tison.


tison  于2020年3月3日周二 下午2:13写道:

> 1.9.2 和 1.10 上已经修复此问题,修改可参考
>
> https://issues.apache.org/jira/browse/FLINK-13749
>
> Best,
> tison.
>
>
> aven.wu  于2020年3月3日周二 下午2:04写道:
>
>> 组件版本 Hadoop 2.7.3,flink 1.9.1 ,elasticsearch6.5。
>> 该问题的起源是因为程序我的用户程序用Jackson,并依赖了Elasticsearch rest client
>> ,在Yarn集群上提交任务的时候出现了如下异常:
>> java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
>> at
>> org.elasticsearch.common.xcontent.json.JsonXContent.(JsonXContent.java:57)
>> 后上网查询后推论有可能是jackson版本问题,于是打印了类加载路径:
>> --main class jackson class load before
>> run--
>> file:/usr/**/hadoop/lib/jackson-databind-2.2.3.jar
>> 果然是从hadoop的classpath下加载了2.2.3版本
>>
>> 之后查看flink run命令入口程序
>> CliFrontend#bulidProgram line 799
>> PackagedProgram#PackagedProgram line 221
>> JobWithJars#BuildUserCodeClassLoad line 142
>> return FlinkUserCodeClassLoaders.parentFirst(urls, parent);
>> 默认使用parentFirst,根据官方文档里面描述的反向类加载,应该是首先从classpath下加载Class,而不是从user
>> jar包中加载类。
>> 请问如何修改此处的类加载顺序,优先从user jar 中加载class
>>
>> Best
>> Aven
>>
>>


Re: Re: flink on yarn任务启动报错 The assigned slot container_e10_1579661300080_0005_01_000002_0 was removed.

2020-01-23 文章 tison
你上面的是 taskmanager.err,需要的是 taskmanager.log

Best,
tison.


郑 洁锋  于2020年1月23日周四 下午10:22写道:

> 之前挂过 后面启动的时候 是checkpoints的文件丢了? 你是这个意思吗?
>
> 
> zjfpla...@hotmail.com
>
> 发件人: zhisheng<mailto:zhisheng2...@gmail.com>
> 发送时间: 2020-01-22 16:45
> 收件人: user-zh<mailto:user-zh@flink.apache.org>
> 主题: Re: flink on yarn任务启动报错 The assigned slot
> container_e10_1579661300080_0005_01_02_0 was removed.
> 应该是你作业之前挂过了
>
> 郑 洁锋  于2020年1月22日周三 上午11:16写道:
>
> > 大家好,
> >flink on yarn任务启动时,发现报错了The assigned slot
> > container_e10_1579661300080_0005_01_02_0 was removed.
> >环境:flink1.8.1,cdh5.14.2,kafka0.10,jdk1.8.0_241
> >
> > flink版本为1.8.1,yarn上的日志:
> >
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:
> >
> 
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Starting
> > YarnJobClusterEntrypoint (Version: , Rev:7297bac,
> Date:24.06.2019
> > @ 23:04:28 CST)
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  OS current user:
> > cloudera-scm
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Current
> > Hadoop/Kerberos user: root
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM: Java
> > HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.241-b07
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Maximum heap size:
> > 406 MiBytes
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JAVA_HOME:
> > /usr/java/default
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Hadoop version:
> 2.6.5
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM Options:
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xms424m
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xmx424m
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Program Arguments:
> > (none)
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Classpath:
> >
> core-1.8.0_release.jar:flink-shaded-hadoop-2-uber-2.6.5-7.0.jar:kafka10-source-1.8.0_release.jar:log4j-1.2.17.jar:mysql-all-side-1.8.0_release.jar:mysql-sink-1.8.0_release.jar:slf4j-log4j12-1.7.15.jar:sql.launcher-1.0-SNAPSHOT.jar:flink.jar:flink-conf.yaml:job.graph::/etc/hadoop/conf.cloudera.yarn:/run/cloudera-scm-agent/process/1129-yarn-NODEMANAGER:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-annotations.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-auth.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-aws.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-azure-datalake.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-common-tests.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-common.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-nfs.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-nfs-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-common-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-common-2.6.0-cdh5.14.2-tests.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-azure-datalake-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-aws-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-auth-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-annotations-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-format.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-format-sources.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-format-javadoc.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-tools.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-thrift.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-test-hadoop2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-scrooge_2.10.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-scala_2.10.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-protobuf.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-pig.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-pig-bundle.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-jackson.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-hadoop.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-hadoop-bundle.jar:/opt/cloudera/parcels/CDH-5.14.

Re: Re: flink-1.10.0通过run -m yarn-cluster提交任务时异常

2020-02-20 文章 tison
常见问题。

现在 Flink 不 bundle hadoop,所以你要设置下 HADOOP_CLASSPATH

Best,
tison.


amenhub  于2020年2月18日周二 上午11:51写道:

> hi, Weihua
>
>
> 如你所说,我想要通过flink on yarn的run方式提交任务到集群上,但是当我运行./bin/flink run -m
> yarn-cluster ../examples/batch/WordCount.jar ,还是一样的错误,
> 日志信息只有这么一些;如果按您所说,是因为没有成功加载FlinkYarnSessionCli导致的,那导致没有成功加载的原因有哪些方面呢?谢谢!
>
>
> 祝好,amenhub
>
>
>
>
>
>
>
> 在 2020-02-18 11:29:13,"Weihua Hu"  写道:
> >Hi, amenhub
> >
> >你应该是要把作业提交到 yarn 上吧。这个错误应该没有正确的加载 FlinkYarnSessionCli
> 导致的,这些日志不是失败的根因。可以多提供一些日志看看。
> >
> >
> >Best
> >Weihua Hu
> >
> >> 2020年2月18日 10:56,amenhub  写道:
> >>
> >> parseHostPortAddress
> >
>


Re: Re: flink on yarn jdk版本问题

2020-01-14 文章 tison
玄学问题,升级 JDK 小版本可接,或与类型擦除有关

你可以share一下 JM 侧的日志,应该有作业执行异常

Best,
tison.


郑 洁锋  于2020年1月15日周三 下午2:17写道:

> Hi,
>
> 非常感谢,可以了,我在flink-conf.yaml中添加了如下配置项即可正常运行 yarn-session.sh了,且Flink 
> Dashboard也能正常查看了
>
> containerized.master.env.JAVA_HOME: /usr/java/jdk1.8.0_25/
>
>  containerized.taskmanager.env.JAVA_HOME: /usr/java/jdk1.8.0_25/
>
>
> 但是在运行官方例子https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/getting-started/tutorials/local_setup.html#run-the-example,测试时报错了:
>
>
> bin/flink run examples/streaming/SocketWindowWordCount.jar  --port 9000
> 2020-01-15 00:46:25,574 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - Found Yarn properties file under /tmp/.yarn-properties-root.
> 2020-01-15 00:46:25,574 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - Found Yarn properties file under /tmp/.yarn-properties-root.
> 2020-01-15 00:46:26,097 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - YARN properties set default parallelism to 4
> 2020-01-15 00:46:26,097 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - YARN properties set default parallelism to 4
> YARN properties set default parallelism to 4
> 2020-01-15 00:46:26,293 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2020-01-15 00:46:26,293 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2020-01-15 00:46:26,375 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Found 
> application JobManager host name 'tdh-2' and port '8181' from supplied 
> application id 'application_1578968334899_0010'
> Starting execution of program
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Could not 
> retrieve the execution result. (JobID: aaf4720f0c432de8d91b848838589c62)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>   at 
> org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
>   at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>   at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>   at 
> java.util.concurrent

Re: Re: MiniCluster问题

2020-01-15 文章 tison
你这完全是把几个概念混在一起了,MiniCluster 就是一个集群,是一个内部的部署模式,跟 standalone
是平行的概念。我看不懂你要干什么,但是就你的问题来说,我上面说了,就是你 MiniCluster new 出来之后没
start。但我不确定这个是不是你要的效果,MiniCluster 和 standalone 是平行的两种东西。

Best,
tison.


郑 洁锋  于2020年1月16日周四 下午2:27写道:

> 因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试
>
> 
> zjfpla...@hotmail.com
>
> 发件人: 郑 洁锋<mailto:zjfpla...@hotmail.com>
> 发送时间: 2020-01-16 14:24
> 收件人: user-zh<mailto:user-zh@flink.apache.org>
> 主题: Re: Re: MiniCluster问题
> 这是完整的到启动的代码
>
> public class ClusterClientFactory {
>
> public static ClusterClient createClusterClient(Options
> launcherOptions) throws Exception {
> String mode = launcherOptions.getMode();
> if(mode.equals(ClusterMode.standalone.name())) {
> return createStandaloneClient(launcherOptions);
> } else if(mode.equals(ClusterMode.yarn.name())) {
> return createYarnClient(launcherOptions,mode);
> }
> throw new IllegalArgumentException("Unsupported cluster client
> type: ");
> }
>
> public static ClusterClient createStandaloneClient(Options
> launcherOptions) throws Exception {
> String flinkConfDir = launcherOptions.getFlinkconf();
> Configuration config =
> GlobalConfiguration.loadConfiguration(flinkConfDir);
> MiniClusterConfiguration.Builder configBuilder = new
> MiniClusterConfiguration.Builder();
> configBuilder.setConfiguration(config);
> MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster);
> LeaderConnectionInfo connectionInfo =
> clusterClient.getClusterConnectionInfo();
> InetSocketAddress address =
> AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
> config.setString(JobManagerOptions.ADDRESS,
> address.getAddress().getHostName());
> config.setInteger(JobManagerOptions.PORT, address.getPort());
> clusterClient.setDetached(true);
> return clusterClient;
> }
>
>
> 启动类中:
>
> ClusterClient clusterClient =
> ClusterClientFactory.createClusterClient(launcherOptions);
> clusterClient.run(program, 1);
> clusterClient.shutdown();
>
> 
> zjfpla...@hotmail.com
>
> 发件人: tison<mailto:wander4...@gmail.com>
> 发送时间: 2020-01-16 13:31
> 收件人: user-zh<mailto:user-zh@flink.apache.org>
> 主题: Re: Re: MiniCluster问题
> MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>
> miniCluster.start();
>
>
> MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster)
> ;
>
> Best,
> tison.
>
>
> tison  于2020年1月16日周四 下午1:30写道:
>
> > 跟集群无关
> > Best,
> > tison.
> >
> >
> > tison  于2020年1月16日周四 下午1:30写道:
> >
> >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
> >>
> >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> 郑 洁锋  于2020年1月16日周四 下午1:18写道:
> >>
> >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
> >>> 我是通过bin/start-cluster.sh启动的flink standalone集群
> >>>
> >>>
> >>> 
> >>> zjfpla...@hotmail.com
> >>>
> >>> 发件人: tison<mailto:wander4...@gmail.com>
> >>> 发送时间: 2020-01-16 12:39
> >>> 收件人: user-zh<mailto:user-zh@flink.apache.org>
> >>> 主题: Re: MiniCluster问题
> >>> 你 MiniCluster 要 start 啊(x
> >>>
> >>> Best,
> >>> tison.
> >>>
> >>>
> >>> 郑 洁锋  于2020年1月16日周四 上午11:38写道:
> >>>
> >>> > MiniCluster代码执行过程中报错:
> >>> >
> >>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> >>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> >>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> >>> further details.
> >>> > Exception in thread "main" java.lang.IllegalStateException:
> >>> MiniCluster is not yet running.
> >>> > at
> >>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> >>> > at
> >>>
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> >>> > at
> >>>
> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
> >>> > at
&

Re: Re: MiniCluster问题

2020-01-15 文章 tison
是的,MiniCluster 会在同一个进程里起 JM TM,是一个主要用于测试的集群

standalone 的意思是没有接 YARN 这种资源管理框架,TM 由用户自己手动起,是一个可用于生产的集群

Best,
tison.


郑 洁锋  于2020年1月16日周四 下午2:39写道:

> 我是不是可以理解为是local的cluster,本身会去启动一个flink cluster(当前服务器模拟分布式环境),无需单独部署一个flink集群
>
> 
> zjfpla...@hotmail.com
>
> 发件人: tison<mailto:wander4...@gmail.com>
> 发送时间: 2020-01-16 14:29
> 收件人: user-zh<mailto:user-zh@flink.apache.org>
> 主题: Re: Re: MiniCluster问题
> 你这完全是把几个概念混在一起了,MiniCluster 就是一个集群,是一个内部的部署模式,跟 standalone
> 是平行的概念。我看不懂你要干什么,但是就你的问题来说,我上面说了,就是你 MiniCluster new 出来之后没
> start。但我不确定这个是不是你要的效果,MiniCluster 和 standalone 是平行的两种东西。
>
> Best,
> tison.
>
>
> 郑 洁锋  于2020年1月16日周四 下午2:27写道:
>
> > 因为我这边看他是standalone,然后看到flink官方文档里面部署模式也有standalone模式,所以按照那个模式部署后来尝试
> >
> > 
> > zjfpla...@hotmail.com
> >
> > 发件人: 郑 洁锋<mailto:zjfpla...@hotmail.com>
> > 发送时间: 2020-01-16 14:24
> > 收件人: user-zh<mailto:user-zh@flink.apache.org>
> > 主题: Re: Re: MiniCluster问题
> > 这是完整的到启动的代码
> >
> > public class ClusterClientFactory {
> >
> > public static ClusterClient createClusterClient(Options
> > launcherOptions) throws Exception {
> > String mode = launcherOptions.getMode();
> > if(mode.equals(ClusterMode.standalone.name())) {
> > return createStandaloneClient(launcherOptions);
> > } else if(mode.equals(ClusterMode.yarn.name())) {
> > return createYarnClient(launcherOptions,mode);
> > }
> > throw new IllegalArgumentException("Unsupported cluster client
> > type: ");
> > }
> >
> > public static ClusterClient createStandaloneClient(Options
> > launcherOptions) throws Exception {
> > String flinkConfDir = launcherOptions.getFlinkconf();
> > Configuration config =
> > GlobalConfiguration.loadConfiguration(flinkConfDir);
> > MiniClusterConfiguration.Builder configBuilder = new
> > MiniClusterConfiguration.Builder();
> > configBuilder.setConfiguration(config);
> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> > miniCluster);
> > LeaderConnectionInfo connectionInfo =
> > clusterClient.getClusterConnectionInfo();
> > InetSocketAddress address =
> > AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
> > config.setString(JobManagerOptions.ADDRESS,
> > address.getAddress().getHostName());
> > config.setInteger(JobManagerOptions.PORT, address.getPort());
> > clusterClient.setDetached(true);
> > return clusterClient;
> > }
> >
> >
> > 启动类中:
> >
> > ClusterClient clusterClient =
> > ClusterClientFactory.createClusterClient(launcherOptions);
> > clusterClient.run(program, 1);
> > clusterClient.shutdown();
> >
> > ________
> > zjfpla...@hotmail.com
> >
> > 发件人: tison<mailto:wander4...@gmail.com>
> > 发送时间: 2020-01-16 13:31
> > 收件人: user-zh<mailto:user-zh@flink.apache.org>
> > 主题: Re: Re: MiniCluster问题
> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> >
> > miniCluster.start();
> >
> >
> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> > miniCluster)
> > ;
> >
> > Best,
> > tison.
> >
> >
> > tison  于2020年1月16日周四 下午1:30写道:
> >
> > > 跟集群无关
> > > Best,
> > > tison.
> > >
> > >
> > > tison  于2020年1月16日周四 下午1:30写道:
> > >
> > >> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
> > >>
> > >> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
> > >>
> > >> Best,
> > >> tison.
> > >>
> > >>
> > >> 郑 洁锋  于2020年1月16日周四 下午1:18写道:
> > >>
> > >>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
> > >>> 我是通过bin/start-cluster.sh启动的flink standalone集群
> > >>>
> > >>>
> > >>> 
> > >>> zjfpla...@hotmail.com
> > >>>
> > >>> 发件人: tison<mailto:wander4...@gmail.com>
> > >>> 发送时间: 2020-01-16 12:39
> > >>> 收件人: user-zh<mailto:user-zh@flink.apache.org>
> > >>> 主题: Re: MiniCluster问题
> > >>> 你 MiniCluster 要 start 啊(x
> > >>>
> > >>> B

Re: MiniCluster问题

2020-01-15 文章 tison
你 MiniCluster 要 start 啊(x

Best,
tison.


郑 洁锋  于2020年1月16日周四 上午11:38写道:

> MiniCluster代码执行过程中报错:
>
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Exception in thread "main" java.lang.IllegalStateException: MiniCluster is 
> not yet running.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> at 
> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
> at 
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> at 
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> at 
> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>
> 报错段代码如下:
>
> Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
> MiniClusterConfiguration.Builder configBuilder = new 
> MiniClusterConfiguration.Builder();
> configBuilder.setConfiguration(config);
> MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster);
>
> 其中flinkConfDir为/opt/flink/conf
>
>
> flink standalone HA集群信息如下:
> --
> zjfpla...@hotmail.com
>
>
>


Re: Re: MiniCluster问题

2020-01-15 文章 tison
MiniCluster miniCluster = new MiniCluster(configBuilder.build());

miniCluster.start();


MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster)
;

Best,
tison.


tison  于2020年1月16日周四 下午1:30写道:

> 跟集群无关
> Best,
> tison.
>
>
> tison  于2020年1月16日周四 下午1:30写道:
>
>> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
>>
>> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
>>
>> Best,
>> tison.
>>
>>
>> 郑 洁锋  于2020年1月16日周四 下午1:18写道:
>>
>>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
>>> 我是通过bin/start-cluster.sh启动的flink standalone集群
>>>
>>>
>>> 
>>> zjfpla...@hotmail.com
>>>
>>> 发件人: tison<mailto:wander4...@gmail.com>
>>> 发送时间: 2020-01-16 12:39
>>> 收件人: user-zh<mailto:user-zh@flink.apache.org>
>>> 主题: Re: MiniCluster问题
>>> 你 MiniCluster 要 start 啊(x
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> 郑 洁锋  于2020年1月16日周四 上午11:38写道:
>>>
>>> > MiniCluster代码执行过程中报错:
>>> >
>>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
>>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>>> further details.
>>> > Exception in thread "main" java.lang.IllegalStateException:
>>> MiniCluster is not yet running.
>>> > at
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>> > at
>>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
>>> > at
>>> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
>>> > at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
>>> > at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
>>> > at
>>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>>> >
>>> > 报错段代码如下:
>>> >
>>> > Configuration config =
>>> GlobalConfiguration.loadConfiguration(flinkConfDir);
>>> > MiniClusterConfiguration.Builder configBuilder = new
>>> MiniClusterConfiguration.Builder();
>>> > configBuilder.setConfiguration(config);
>>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
>>> miniCluster);
>>> >
>>> > 其中flinkConfDir为/opt/flink/conf
>>> >
>>> >
>>> > flink standalone HA集群信息如下:
>>> > --
>>> > zjfpla...@hotmail.com
>>> >
>>> >
>>> >
>>>
>>


Re: Re: MiniCluster问题

2020-01-15 文章 tison
跟集群无关
Best,
tison.


tison  于2020年1月16日周四 下午1:30写道:

> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
>
> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
>
> Best,
> tison.
>
>
> 郑 洁锋  于2020年1月16日周四 下午1:18写道:
>
>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
>> 我是通过bin/start-cluster.sh启动的flink standalone集群
>>
>>
>> ____
>> zjfpla...@hotmail.com
>>
>> 发件人: tison<mailto:wander4...@gmail.com>
>> 发送时间: 2020-01-16 12:39
>> 收件人: user-zh<mailto:user-zh@flink.apache.org>
>> 主题: Re: MiniCluster问题
>> 你 MiniCluster 要 start 啊(x
>>
>> Best,
>> tison.
>>
>>
>> 郑 洁锋  于2020年1月16日周四 上午11:38写道:
>>
>> > MiniCluster代码执行过程中报错:
>> >
>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>> further details.
>> > Exception in thread "main" java.lang.IllegalStateException: MiniCluster
>> is not yet running.
>> > at
>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>> > at
>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
>> > at
>> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
>> > at
>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
>> > at
>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
>> > at
>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>> >
>> > 报错段代码如下:
>> >
>> > Configuration config =
>> GlobalConfiguration.loadConfiguration(flinkConfDir);
>> > MiniClusterConfiguration.Builder configBuilder = new
>> MiniClusterConfiguration.Builder();
>> > configBuilder.setConfiguration(config);
>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
>> miniCluster);
>> >
>> > 其中flinkConfDir为/opt/flink/conf
>> >
>> >
>> > flink standalone HA集群信息如下:
>> > --
>> > zjfpla...@hotmail.com
>> >
>> >
>> >
>>
>


Re: Re: MiniCluster问题

2020-01-15 文章 tison
1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖

2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊

Best,
tison.


郑 洁锋  于2020年1月16日周四 下午1:18写道:

> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
> 我是通过bin/start-cluster.sh启动的flink standalone集群
>
>
> 
> zjfpla...@hotmail.com
>
> 发件人: tison<mailto:wander4...@gmail.com>
> 发送时间: 2020-01-16 12:39
> 收件人: user-zh<mailto:user-zh@flink.apache.org>
> 主题: Re: MiniCluster问题
> 你 MiniCluster 要 start 啊(x
>
> Best,
> tison.
>
>
> 郑 洁锋  于2020年1月16日周四 上午11:38写道:
>
> > MiniCluster代码执行过程中报错:
> >
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further details.
> > Exception in thread "main" java.lang.IllegalStateException: MiniCluster
> is not yet running.
> > at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> > at
> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
> > at
> org.apache.flink.client.program.MiniClusterClient.(MiniClusterClient.java:61)
> > at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
> > at
> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
> > at
> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
> >
> > 报错段代码如下:
> >
> > Configuration config =
> GlobalConfiguration.loadConfiguration(flinkConfDir);
> > MiniClusterConfiguration.Builder configBuilder = new
> MiniClusterConfiguration.Builder();
> > configBuilder.setConfiguration(config);
> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
> > MiniClusterClient clusterClient = new MiniClusterClient(config,
> miniCluster);
> >
> > 其中flinkConfDir为/opt/flink/conf
> >
> >
> > flink standalone HA集群信息如下:
> > --
> > zjfpla...@hotmail.com
> >
> >
> >
>


Re: 支持flink.yarn.jars 参数

2020-01-20 文章 tison
有这个想法,目前腾讯内部已经实现了相关功能,我记得 Yang Wang(in cc) 在阿里也做了类似的功能,这个要做干净可能需要连着跟
YarnClusterDescriptor 的代码都整理一下。确实也看到这个需求常常被提起,尽量在 1.11 里面实现吧。

你也可以再详细描述下行为或者由你实现社区这边帮忙 review 呀,我不太记得有没有 JIRA 了,你可以找找或者直接建一个。

Best,
tison.


melin li  于2020年1月20日周一 下午4:59写道:

> 在spark中有一个spark.yarn.jars 参数,作业依赖jar 直接放在hdfs上,避免从本地上传jar,在分发,加快启动速度。
>
> YarnClusterDescriptor.java
>
> // upload and register ship files
> String systemJarHdfsDir =
> configuration.getString("stream.flink.system.jars.dir", "");
> List systemClassPaths = findHdfsJars(fs, systemJarHdfsDir, paths,
>   localResources, envShipFileList);
>
> String userJars = configuration.getString("stream.flink.use.jars", "");
> List userClassPaths;
> if (userJars != null && !"".equals(userJars)) {
>   userClassPaths = registerUserJars(fs, userJars.split(","), paths,
> localResources, envShipFileList);
> } else {
>   userClassPaths = Collections.emptyList();
> }
>
> if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
>   systemClassPaths.addAll(userClassPaths);
> }
>
> // normalize classpath by sorting
> Collections.sort(systemClassPaths);
> Collections.sort(userClassPaths);
>
> // classpath assembler
> StringBuilder classPathBuilder = new StringBuilder();
> if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
>   for (String userClassPath : userClassPaths) {
> classPathBuilder.append(userClassPath).append(File.pathSeparator);
>   }
> }
> for (String classPath : systemClassPaths) {
>   classPathBuilder.append(classPath).append(File.pathSeparator);
> }
> if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
>   for (String userClassPath : userClassPaths) {
> classPathBuilder.append(userClassPath).append(File.pathSeparator);
>   }
> }
>
> // Setup jar for ApplicationMaster
> Path remotePathJar = setupFlinkJar("flink.jar", fs, flinkJarPath,
> localResources);
>


Re: flink on yarn任务启动报错 The assigned slot container_e10_1579661300080_0005_01_000002_0 was removed.

2020-01-21 文章 tison
20/01/22 11:08:49 INFO yarn.YarnResourceManager: Closing TaskExecutor
connection container_e10_1579661300080_0005_01_02 because: The
heartbeat of TaskManager with id container_e10_1579661300080_0005_01_02
timed out.

你请求资源的时候把 slot 请求发到这台机器上了,然后它心跳超时了,你看看 TM 有没有正常起来,有没有资源不够或者挂了

Best,
tison.


郑 洁锋  于2020年1月22日周三 上午11:16写道:

> 大家好,
>flink on yarn任务启动时,发现报错了The assigned slot
> container_e10_1579661300080_0005_01_02_0 was removed.
>环境:flink1.8.1,cdh5.14.2,kafka0.10,jdk1.8.0_241
>
> flink版本为1.8.1,yarn上的日志:
>
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:
> 
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Starting
> YarnJobClusterEntrypoint (Version: , Rev:7297bac, Date:24.06.2019
> @ 23:04:28 CST)
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  OS current user:
> cloudera-scm
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Current
> Hadoop/Kerberos user: root
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM: Java
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.241-b07
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Maximum heap size:
> 406 MiBytes
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JAVA_HOME:
> /usr/java/default
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Hadoop version: 2.6.5
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM Options:
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xms424m
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xmx424m
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Program Arguments:
> (none)
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Classpath:
> core-1.8.0_release.jar:flink-shaded-hadoop-2-uber-2.6.5-7.0.jar:kafka10-source-1.8.0_release.jar:log4j-1.2.17.jar:mysql-all-side-1.8.0_release.jar:mysql-sink-1.8.0_release.jar:slf4j-log4j12-1.7.15.jar:sql.launcher-1.0-SNAPSHOT.jar:flink.jar:flink-conf.yaml:job.graph::/etc/hadoop/conf.cloudera.yarn:/run/cloudera-scm-agent/process/1129-yarn-NODEMANAGER:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-annotations.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-auth.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-aws.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-azure-datalake.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-common-tests.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-common.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-nfs.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-nfs-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-common-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-common-2.6.0-cdh5.14.2-tests.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-azure-datalake-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-aws-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-auth-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-annotations-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-format.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-format-sources.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-format-javadoc.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-tools.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-thrift.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-test-hadoop2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-scrooge_2.10.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-scala_2.10.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-protobuf.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-pig.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-pig-bundle.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-jackson.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-hadoop.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-hadoop-bundle.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-generator.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-encoding.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-common.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-column.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-c

Re: Re: flink on yarn任务启动报错 The assigned slot container_e10_1579661300080_0005_01_000002_0 was removed.

2020-01-21 文章 tison
那你看下 TM 那台机器上的 TM 日志,从 JM 端来看 TM 曾经成功起来过并注册了自己,你看看 TM 是怎么挂的或者别的什么情况

Best,
tison.


郑 洁锋  于2020年1月22日周三 上午11:54写道:

> TM没有起来,服务器本身内存cpu都是够的,还很空闲
>
> 
> zjfpla...@hotmail.com
>
> 发件人: tison<mailto:wander4...@gmail.com>
> 发送时间: 2020-01-22 11:25
> 收件人: user-zh<mailto:user-zh@flink.apache.org>
> 主题: Re: flink on yarn任务启动报错 The assigned slot
> container_e10_1579661300080_0005_01_02_0 was removed.
> 20/01/22 11:08:49 INFO yarn.YarnResourceManager: Closing TaskExecutor
> connection container_e10_1579661300080_0005_01_02 because: The
> heartbeat of TaskManager with id container_e10_1579661300080_0005_01_02
> timed out.
>
> 你请求资源的时候把 slot 请求发到这台机器上了,然后它心跳超时了,你看看 TM 有没有正常起来,有没有资源不够或者挂了
>
> Best,
> tison.
>
>
> 郑 洁锋  于2020年1月22日周三 上午11:16写道:
>
> > 大家好,
> >flink on yarn任务启动时,发现报错了The assigned slot
> > container_e10_1579661300080_0005_01_02_0 was removed.
> >环境:flink1.8.1,cdh5.14.2,kafka0.10,jdk1.8.0_241
> >
> > flink版本为1.8.1,yarn上的日志:
> >
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:
> >
> 
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Starting
> > YarnJobClusterEntrypoint (Version: , Rev:7297bac,
> Date:24.06.2019
> > @ 23:04:28 CST)
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  OS current user:
> > cloudera-scm
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Current
> > Hadoop/Kerberos user: root
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM: Java
> > HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.241-b07
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Maximum heap size:
> > 406 MiBytes
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JAVA_HOME:
> > /usr/java/default
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Hadoop version:
> 2.6.5
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM Options:
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xms424m
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xmx424m
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Program Arguments:
> > (none)
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Classpath:
> >
> core-1.8.0_release.jar:flink-shaded-hadoop-2-uber-2.6.5-7.0.jar:kafka10-source-1.8.0_release.jar:log4j-1.2.17.jar:mysql-all-side-1.8.0_release.jar:mysql-sink-1.8.0_release.jar:slf4j-log4j12-1.7.15.jar:sql.launcher-1.0-SNAPSHOT.jar:flink.jar:flink-conf.yaml:job.graph::/etc/hadoop/conf.cloudera.yarn:/run/cloudera-scm-agent/process/1129-yarn-NODEMANAGER:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-annotations.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-auth.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-aws.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-azure-datalake.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-common-tests.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-common.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-nfs.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-nfs-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-common-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-common-2.6.0-cdh5.14.2-tests.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-azure-datalake-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-aws-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-auth-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/hadoop-annotations-2.6.0-cdh5.14.2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-format.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-format-sources.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-format-javadoc.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-tools.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-thrift.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-test-hadoop2.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-scrooge_2.10.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-scala_2.10.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-protobuf.jar:/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/lib/hadoop/parquet-pig.jar:/opt/cloudera/parcels/CDH

Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 文章 tison
Congratulations! Dian

Best,
tison.


Zhu Zhu  于2020年1月17日周五 上午10:47写道:

> Congratulations Dian.
>
> Thanks,
> Zhu Zhu
>
> hailongwang <18868816...@163.com> 于2020年1月17日周五 上午10:01写道:
>
>>
>> Congratulations Dian !
>>
>> Best,
>> Hailong Wang
>>
>>
>>
>>
>> 在 2020-01-16 21:15:34,"Congxian Qiu"  写道:
>>
>> Congratulations Dian Fu
>>
>> Best,
>> Congxian
>>
>>
>> Jark Wu  于2020年1月16日周四 下午7:44写道:
>>
>>> Congratulations Dian and welcome on board!
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 16 Jan 2020 at 19:32, Jingsong Li 
>>> wrote:
>>>
>>> > Congratulations Dian Fu. Well deserved!
>>> >
>>> > Best,
>>> > Jingsong Lee
>>> >
>>> > On Thu, Jan 16, 2020 at 6:26 PM jincheng sun >> >
>>> > wrote:
>>> >
>>> >> Congrats Dian Fu and welcome on board!
>>> >>
>>> >> Best,
>>> >> Jincheng
>>> >>
>>> >> Shuo Cheng  于2020年1月16日周四 下午6:22写道:
>>> >>
>>> >>> Congratulations!  Dian Fu
>>> >>>
>>> >>> > Xingbo Wei Zhong  于2020年1月16日周四 下午6:13写道:  jincheng sun
>>> >>> 于2020年1月16日周四 下午5:58写道:
>>> >>>
>>> >>
>>> >
>>> > --
>>> > Best, Jingsong Lee
>>> >
>>>
>>
>>
>>
>>
>>
>


Re: CountEvictor 与 TriggerResult.FIRE_AND_PURGE 清理窗口数据有区别吗?

2020-01-21 文章 tison
正好看到这一部分,还是有的,你考虑下滑动的计数窗口

[1] 会在 fire 之后把整个 windowState 丢掉,[2] 其实会重新计算 evict 之后的 windowState

Best,
tison.


USERNAME  于2020年1月21日周二 下午5:21写道:

> 大家,新年快乐~
>
>
> [1] TriggerResult.FIRE_AND_PURGE
>
> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java#L74
> [2] CountEvictor
>
> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
>
>


Re: Re: CountEvictor 与 TriggerResult.FIRE_AND_PURGE 清理窗口数据有区别吗?

2020-01-21 文章 tison
你读一下 EvictingWindowOperator 相关代码或者说 Evictor#evictBefore 的调用链,里面关于 window
state 的处理是比较 hack 的,用文字说也起不到简练的作用

private void emitWindowContents(W window, Iterable>
contents, ListState> windowState) throws Exception {
   timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());

   // Work around type system restrictions...
   FluentIterable> recordsWithTimestamp = FluentIterable
  .from(contents)
  .transform(new Function, TimestampedValue>() {
 @Override
 public TimestampedValue apply(StreamRecord input) {
return TimestampedValue.from(input);
 }
  });
   evictorContext.evictBefore(recordsWithTimestamp,
Iterables.size(recordsWithTimestamp));

   FluentIterable projectedContents = recordsWithTimestamp
  .transform(new Function, IN>() {
 @Override
 public IN apply(TimestampedValue input) {
return input.getValue();
 }
  });

   processContext.window = triggerContext.window;
   userFunction.process(triggerContext.key, triggerContext.window,
processContext, projectedContents, timestampedCollector);
   evictorContext.evictAfter(recordsWithTimestamp,
Iterables.size(recordsWithTimestamp));

   //work around to fix FLINK-4369, remove the evicted elements from
the windowState.
   //this is inefficient, but there is no other way to remove elements
from ListState, which is an AppendingState.
   windowState.clear();
   for (TimestampedValue record : recordsWithTimestamp) {
  windowState.add(record.getStreamRecord());
   }
}

Best,
tison.


USERNAME  于2020年1月21日周二 下午8:25写道:

> evict 丢弃掉的数据,在内存或者RocksDB中也会同步删除吗?
>
>
>
>
>
>
> 在 2020-01-21 17:27:38,"tison"  写道:
> >正好看到这一部分,还是有的,你考虑下滑动的计数窗口
> >
> >[1] 会在 fire 之后把整个 windowState 丢掉,[2] 其实会重新计算 evict 之后的 windowState
> >
> >Best,
> >tison.
> >
> >
> >USERNAME  于2020年1月21日周二 下午5:21写道:
> >
> >> 大家,新年快乐~
> >>
> >>
> >> [1] TriggerResult.FIRE_AND_PURGE
> >>
> >>
> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java#L74
> >> [2] CountEvictor
> >>
> >>
> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
> >>
> >>
>


Re: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式

2020-01-09 文章 tison
cc Yadong

帮你抄送了 WebUI 重构的 manager

Best,
tison.


 于2020年1月10日周五 上午11:26写道:

> 1.9的前端ui不是相比于1.8重构了吗,官网的visualizer页面还是1.8的老样式
>
> -邮件原件-
> 发件人: tison 
> 发送时间: 2020年1月8日 13:14
> 收件人: user-zh 
> 主题: Re: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式
>
> 请问你所说的 1.9 的样式是怎么样的呢?我记得最近有跟 visualizer 相关的讨论,但是没有这个特殊的 issue,你可以直接在 JIRA
> 上提 issue
>
> Best,
> tison.
>
>
>  于2020年1月8日周三 下午12:56写道:
>
> > 有大佬能解答下吗
> >
> > -邮件原件-
> > 发件人: slle...@aliyun.com.INVALID 
> > 发送时间: 2020年1月6日 11:15
> > 收件人: user-zh@flink.apache.org
> > 主题: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式
> >
> > 链接地址:https://flink.apache.org/visualizer/index.html
> >
> >
>


Re: jobgraph 生成

2020-01-07 文章 tison
A public way to get JSON plan of a JobGraph is, with an existing Flink
Cluster, use REST API JarPlan[1].

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#jars-jarid-plan


tison  于2020年1月8日周三 上午11:08写道:

> Hi Zhang,
>
> I just notice that it is sent to user list. Please send to user-zh list(in
> cc) next time if you want to discuss in Chinese.
>
> Best,
> tison.
>
>
> tison  于2020年1月8日周三 上午11:06写道:
>
>> 如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以
>>
>> JsonPlanGenerator.generatePlan(jobGraph)
>>
>> 拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。
>>
>> Best,
>> tison.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos
>>
>>
>> 张江  于2020年1月8日周三 上午11:01写道:
>>
>>> 大家好,
>>>
>>> 通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink
>>> visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?
>>>
>>> flink里似乎没有直接的API可以调用,但是我在flink web
>>> ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?
>>>
>>>
>>> 谢谢
>>>
>>> 张江
>>> 邮箱:zjkingdom2...@163.com
>>>
>>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=%E5%BC%A0%E6%B1%9F=zjkingdom2010%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22%E9%82%AE%E7%AE%B1%EF%BC%9Azjkingdom2010%40163.com%22%5D>
>>>
>>> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail88> 定制
>>>
>>


Re: jobgraph 生成

2020-01-07 文章 tison
Hi Zhang,

I just notice that it is sent to user list. Please send to user-zh list(in
cc) next time if you want to discuss in Chinese.

Best,
tison.


tison  于2020年1月8日周三 上午11:06写道:

> 如果你是流作业的话,参考这个页面[1]搞到 JobGraph 之后可以
>
> JsonPlanGenerator.generatePlan(jobGraph)
>
> 拿到 JobGraph 的 JSON。不过这个是非常内部的逻辑,没有任何兼容性保障。
>
> Best,
> tison.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/mesos.html#flink-job-cluster-on-mesos
>
>
> 张江  于2020年1月8日周三 上午11:01写道:
>
>> 大家好,
>>
>> 通过调用env.getExecutionPlan可以以json格式得到streamGraph,放到flink
>> visualizer里可以得到可视化结果。现在我想以类似方法得到 jobGraph,生成jobGraph的json文件。不知道应该怎么做?
>>
>> flink里似乎没有直接的API可以调用,但是我在flink web
>> ui界面上可以看到job经过chain后的执行图,如下图所示。这个是怎么生成的?
>>
>>
>> 谢谢
>>
>> 张江
>> 邮箱:zjkingdom2...@163.com
>>
>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=%E5%BC%A0%E6%B1%9F=zjkingdom2010%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22%E9%82%AE%E7%AE%B1%EF%BC%9Azjkingdom2010%40163.com%22%5D>
>>
>> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail88> 定制
>>
>


Re: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式

2020-01-07 文章 tison
请问你所说的 1.9 的样式是怎么样的呢?我记得最近有跟 visualizer 相关的讨论,但是没有这个特殊的 issue,你可以直接在 JIRA
上提 issue

Best,
tison.


 于2020年1月8日周三 下午12:56写道:

> 有大佬能解答下吗
>
> -邮件原件-
> 发件人: slle...@aliyun.com.INVALID 
> 发送时间: 2020年1月6日 11:15
> 收件人: user-zh@flink.apache.org
> 主题: 问一下Flink Plan Visualizer什么时候会更新成1.9的样式
>
> 链接地址:https://flink.apache.org/visualizer/index.html
>
>


Re: Flink 1.10 StopWithSavepoint only suit for the sources that implement the StoppableFunction interface?

2020-03-12 文章 tison
The StoppableFunction is gone.

See also https://issues.apache.org/jira/browse/FLINK-11889

Best,
tison.


LakeShen  于2020年3月12日周四 下午5:44写道:

> Hi community,
> now  I am seeing the FLIP-45 , as I see the stop command only suit
> for the sources that implement the StoppableFunction interface.
> So I have a question is that if I use StopWithSavepoint command to
> stop my flink task , just like './flink stop -p xxx ...', this command
> only suit for the sources that implement the StoppableFunction interface,
> is it correct?
> Thanks to your reply.
>
> Best wishes,
> LakeShen
>


Re: 关于Flink 命令行参数广播的问题

2020-03-11 文章 tison
Hi Aven,

静态字段的使用可能会很 tricky,因为只有同一个 task 的代码才运行在同一个 classloader 里。我见过想用静态字段做全局 Map
存储的,那个实际上只有并行度设置为 1 的时候语义才对。

你说启动的生命周期执行一些用户代码,那其实就是 RichFunction 的 open
方法,它就是设计来做这个的。具体可以看你的实际业务,未必要搞得这么奇怪(x

Best,
tison.


aven.wu  于2020年3月12日周四 上午10:54写道:

> Hello
>
> 还有一个问题,除了在算子的Open方法中获取这个参数还有别的地方可以获取吗?或者在Gobgraph启动的生命周期里面有哪个阶段可以被调用提执行一些用户代码。
>
> 我的需求是需要通过命令行参数初始化一些静态类的属性,全局的静态类会被算子调用以执行某些通用的功能,如果我在open方法中去初始化的话是不太优雅,并且为了保证可用必须在每个算子的Open方法中都调用,对于一些非Rich的算子使用静态方法就会有问题。
>
> Best
> Aven
>
> 发件人: zhisheng
> 发送时间: 2020年3月11日 21:16
> 收件人: user-zh
> 主题: Re: 关于Flink 命令行参数广播的问题
>
> hi,aven.wu
>
> 可以使用 ParameterTool
> 获取到传入的参数,然后通过 env.getConfig().setGlobalJobParameters(parameterTool);
>
> 在算子中可以在 open 方法里面通过
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); 获取到配置
>
> aven.wu  于2020年3月11日周三 下午3:42写道:
>
> > Hi,大家好!
> > 遇到一个问题,在使用flink run
> >
> 提交任务时命令行参数也是就那些被main方法接收的那些,不知道如何在taskManager获取到。导致必须从外部系统(hdfs,文件)来读取例如ES,mysql等连接配置。Flink是否有这种广播命令行参数的能力,如果没有是否会考虑加入这个特性。
> >
> > Best
> > Aven
> >
> >
>
>


Re: Re: jobGroph如何以pre-job的方式提交到yarn运行

2020-03-06 文章 tison
OK 你先说一下你的 Flink 版本是啥,我感觉这个是很久以前的版本,差不多 1.7
的样子,然后没有保密问题的话你的代码文件发一下,然后完整报错堆栈也发一下。

这个可能是你初始化 YarnClusterDescriptor 的时候有问题,你提供的都是很残缺的片段,没法猜测到底是啥原因。

一般来说 Flink 现在的 Client 实现并不是很好,直接使用 CLI 是不会有太多问题的,如果直接依赖 ClusterDescriptor
这些抽象里面有一些潜规则,你自己看的话可能得对着 Flink 的源码使用点逐步调试排查。

Best,
tison.


nicygan  于2020年3月7日周六 下午3:16写道:

> tison,你好运行到这里时,报空指针
> Caused by: java.lang.NullPointerException
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:506)
>
> getNodeReports方法中:
> GetClusterNodesResponse response = rmClient.getClusterNodes(request);
> 这句的rmClient为null值。
>
>
>
> 我看YarnClientImpl中,有个start()可为rmClient赋值,但是,我加上此方法执行时又会报以下错误:
> Exception in thread "main"
> org.apache.hadoop.service.ServiceStateException:
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl cannot enter state
> STARTED from state NOTINITED
> at
> org.apache.hadoop.service.ServiceStateModel.checkStateTransition(ServiceStateModel.java:129)
> at
> org.apache.hadoop.service.ServiceStateModel.enterState(ServiceStateModel.java:111)
> at
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:190)
>
>
>
>
>
>
>
>
> 在 2020-03-07 11:15:10,"tison"  写道:
> >不成功的报错是啥?
> >
> >Best,
> >tison.
> >
> >
> >nicygan  于2020年3月7日周六 上午11:14写道:
> >
> >> dear all:
> >>
> >>
> 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成
> >> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。
> >>
> >> ..
> >> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph();
> >> ..
> >> ..
> >> yarnClusterDescriptor.deployJobCluster(
> >> clusterSpecification,
> >>   jobGraph, true);
> >>
> >>
>


Re: jobGroph如何以pre-job的方式提交到yarn运行

2020-03-06 文章 tison
不成功的报错是啥?

Best,
tison.


nicygan  于2020年3月7日周六 上午11:14写道:

> dear all:
>
> 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成
> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。
>
> ..
> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph();
> ..
> ..
> yarnClusterDescriptor.deployJobCluster(
> clusterSpecification,
>   jobGraph, true);
>
>


Re: Re: Re: jobGroph如何以pre-job的方式提交到yarn运行

2020-03-07 文章 tison
报错前面加上这两行

final YarnConfiguration yarnConfiguration = new YarnConfiguration();
yarnClient.init(yarnConfiguration);

如果还不对就查一下 HADOOP_CLASSPATH 和 yarn-site 这些配置有没有正确配置上

Best,
tison.


nicygan  于2020年3月7日周六 下午4:53写道:

> tison,你好。
>
>
> 版本是1.9,没啥隐私,代码如下:
>   JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph();
>
> YarnConfiguration yarnConfiguration = new YarnConfiguration();
>   String configurationDirectory =
> CliFrontend.getConfigurationDirectoryFromEnv();
>   Configuration flinkConfiguration =
> GlobalConfiguration.loadConfiguration(configurationDirectory);
>   YarnClient yarnClient = YarnClient.createYarnClient();
>   yarnClient.start();  //报错
>
>   YarnClusterDescriptor yarnClusterDescriptor =
> new YarnClusterDescriptor(
> flinkConfiguration,
> yarnConfiguration,
> configurationDirectory,
> yarnClient, false);
>
>   yarnClusterDescriptor.setLocalJarPath(new
> Path(configurationDirectory + "/../lib/*.jar"));
>
>   ClusterSpecification clusterSpecification = new
> ClusterSpecification.ClusterSpecificationBuilder()
> .setMasterMemoryMB(256)
> .setTaskManagerMemoryMB(740)
> .setNumberTaskManagers(1)
> .setSlotsPerTaskManager(1)
> .createClusterSpecification();
>
>   ClusterClient clusterClient =
> yarnClusterDescriptor.deployJobCluster(
> clusterSpecification,
> jobGraph, true);  //报错
>
>   clusterClient.submitJob(jobGraph,
> StreamSQLPreJob.class.getClassLoader());
>
>
>
>
> 在 2020-03-07 15:34:12,"tison"  写道:
> >OK 你先说一下你的 Flink 版本是啥,我感觉这个是很久以前的版本,差不多 1.7
> >的样子,然后没有保密问题的话你的代码文件发一下,然后完整报错堆栈也发一下。
> >
> >这个可能是你初始化 YarnClusterDescriptor 的时候有问题,你提供的都是很残缺的片段,没法猜测到底是啥原因。
> >
> >一般来说 Flink 现在的 Client 实现并不是很好,直接使用 CLI 是不会有太多问题的,如果直接依赖 ClusterDescriptor
> >这些抽象里面有一些潜规则,你自己看的话可能得对着 Flink 的源码使用点逐步调试排查。
> >
> >Best,
> >tison.
> >
> >
> >nicygan  于2020年3月7日周六 下午3:16写道:
> >
> >> tison,你好运行到这里时,报空指针
> >> Caused by: java.lang.NullPointerException
> >> at
> >>
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:506)
> >>
> >> getNodeReports方法中:
> >> GetClusterNodesResponse response = rmClient.getClusterNodes(request);
> >> 这句的rmClient为null值。
> >>
> >>
> >>
> >> 我看YarnClientImpl中,有个start()可为rmClient赋值,但是,我加上此方法执行时又会报以下错误:
> >> Exception in thread "main"
> >> org.apache.hadoop.service.ServiceStateException:
> >> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl cannot enter state
> >> STARTED from state NOTINITED
> >> at
> >>
> org.apache.hadoop.service.ServiceStateModel.checkStateTransition(ServiceStateModel.java:129)
> >> at
> >>
> org.apache.hadoop.service.ServiceStateModel.enterState(ServiceStateModel.java:111)
> >> at
> >>
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:190)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-03-07 11:15:10,"tison"  写道:
> >> >不成功的报错是啥?
> >> >
> >> >Best,
> >> >tison.
> >> >
> >> >
> >> >nicygan  于2020年3月7日周六 上午11:14写道:
> >> >
> >> >> dear all:
> >> >>
> >> >>
> >>
> 我用的flink版本1.9,现在我把flinksql语句生成了jobGroph,现在我不知道如何提交到yarn上,以pre-job方式运行。我尝试通过生成
> >> >> yarnClusterDescriptor提交代码,但不成功,代码大致如下,各位有没有成功示例求分享。
> >> >>
> >> >> ..
> >> >> JobGraph jobGraph = env.getStreamGraph("StreamSql").getJobGraph();
> >> >> ..
> >> >> ..
> >> >> yarnClusterDescriptor.deployJobCluster(
> >> >> clusterSpecification,
> >> >>   jobGraph, true);
> >> >>
> >> >>
> >>
>


Re: How to change the flink web-ui jobServer?

2020-03-14 文章 tison
IIRC Flink on Kubernetes doesn't support configure rest port as port range.

Maybe Yang(in cc) can give more information and if so, our current logic
only take care of RestOptions.PORT but not RestOptions.BIND_PORT, which
will be a bug.

Best,
tison.


LakeShen  于2020年3月15日周日 上午11:25写道:

> Ok, thanks! Arvid
>
> Arvid Heise  于2020年3月10日周二 下午4:14写道:
>
>> Hi LakeShen,
>>
>> you can change the port with
>>
>> conf.setInteger(RestOptions.PORT, 8082);
>>
>> or if want to be on the safe side specify a range
>>
>> conf.setString(RestOptions.BIND_PORT, "8081-8099");
>>
>>
>> On Mon, Mar 9, 2020 at 10:47 AM LakeShen 
>> wrote:
>>
>>> Hi community,
>>>now I am moving the flink job to k8s,and I plan to use the
>>> ingress to show the flink web ui  , the problem is that fink job server
>>> aren't correct, so I want to change the flink web-ui jobserver ,I don't
>>> find the any method  to change it ,are there some method to do that?
>>>Thanks to your reply.
>>>
>>> Best wishes,
>>> LakeShen
>>>
>>


Re: org.apache.flink.table.planner.PlanningConfigurationBuilder.java

2020-03-10 文章 tison
这个文件是编译时生成的,请在根目录下运行 mvn package

Best,
tison.


jaslou  于2020年3月10日周二 下午11:15写道:

> Hi,
>
>
> 在编译源码的时候发现flink-table-parnner模块的org.apache.flink.table.planner.PlanningConfigurationBuilder.java类报错,
> 找不到
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl文件,发现flink-sql-parser模块下没有impl这个package以及FlinkSqlParserImpl文件
>
> version:release-1.10.0
>
> Best,
> Jaslou
>


Re: 关于flink run -m yarn提交失败。flink1.9

2020-04-14 文章 tison
-yd 参数影响的是你是否使用 perjob 模式提交作业,简单地说

with -yd 以 perjob 模式提交作业,即启动一个新集群
without -yd 提交到一个现有的 Flink on YARN 集群

哪个是你的需求呢?有没有实现用 yarn-session 启动 Flink on YARN 集群呢?

Best,
tison.


guanyq  于2020年4月15日周三 上午8:46写道:

> 提交失败,本人测试与-yd参数有关系,这个参数去掉就可以提交了。但是不知道 -yd这个参数影响了什么?
> At 2020-04-14 15:31:00, "guanyq"  wrote:
> >提交失败,yarn资源也还有很多,为什么会提交失败呢?
> >
> >提交脚本
> >./bin/flink run -m yarn-cluster \
> >-ynm TestDataProcess \
> >-yd \
> >-yn 2 \
> >-ytm 1024 \
> >-yjm 1024 \
> >-c com.data.processing.unconditionalacceptance.TestDataProcess \
> >./tasks/UnconditionalAcceptanceDataProcess.jar \
> >
> >
> >yarn资源
> >Apps Submitted Apps PendingApps RunningApps Completed  Containers
> Running  Memory Used Memory TotalMemory Reserved VCores Used
>  VCores TotalVCores Reserved Active NodesDecommissioned Nodes
> Lost Nodes  Unhealthy Nodes Rebooted Nodes
> >2390   12  227 173 334 GB  1.42 TB 0 B 173
>  288 0   9   0   0   0   0
> >
> >
> >
> >2020-04-14 15:14:19,002 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:19,253 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:19,504 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:19,755 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:20,006 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:20,257 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:20,508 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:20,759 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:21,011 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:21,262 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:21,513 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:21,764 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:22,015 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:22,265 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:22,517 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:22,768 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> >2020-04-14 15:14:23,019 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested re

Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 文章 tison
在 getCurrentWatermark 里返回 null 就行了,会 forward 此前的 watermark
的。另外语义上使用 AssignerWithPunctuatedWatermarks 会更合适一点。

Best,
tison.


taowang  于2020年4月16日周四 下午5:13写道:

> Hello,大家好:
> 在flink
> stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。
> 为了实现这个功能,我想有两种方法:
> 1. 在算子输出后面重新为消息分配水印:看到flink
> stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark
> `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。
> 2.
> 在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。
>
>
> 我现在只能使用`assignTimestampsAndWatermarks`
> 去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗?
> 感谢解答!


Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 文章 tison
从语义上说,已经有产生 Watermark 的逻辑了,如果 forward 此前的 watermark
在其他一些用户场景下或许也不合适。从另一个角度考虑你也可以把 watermark 带在 element
上,实现 AssignerWithPunctuatedWatermarks 的 Watermark
checkAndGetNextWatermark(T lastElement, long extractedTimestamp); 方法时从
element 取出来

Best,
tison.


tison  于2020年4月16日周四 下午10:36写道:

> 喔,看差了...Flink 原生没有这样的支持,不过可以扩展基础的算子来实现你要的逻辑
>
> 参考 assignTimestampsAndWatermarks
> 的实现,以及 TimestampsAndPunctuatedWatermarksOperator,不要重写 processWatermark
> 方法,应该可以实现。DataStream 方面调用更基础的 transform 方法
>
> 如果你觉得这是一个合理的需求,应该开箱即用,也可以到 Flink 的 JIRA(issue)列表上提
> https://jira.apache.org/jira/projects/FLINK/issues 注册 id 后创建 JIRA 单即可
>
> Best,
> tison.
>
>
> taowang  于2020年4月16日周四 下午10:12写道:
>
>> 感谢回复,但是很抱歉我试了一下发现不可以。
>> 无论是使用了`AssignerWithPeriodicWatermarks`的`getCurrentWatermark`还是`AssignerWithPunctuatedWatermarks`的`checkAndGetNextWatermark`,当它们`return
>> null`时下游算子拿到的水印都显示为`No
>> Watermark`,我在下游算子中打印出`context.currentWatermark()`发现都是`Long.MIN_VALUE`的值。
>> 看了这两个接口文档,不太理解这里的`no new watermark will be
>> generated.`是什么意思,一般理解为不会生成新的水印那之前的水印就应该被继续传下去吧,但实际看来是下游接收不到水印(`no
>> watermark`?)。
>> @tison,你那边是尝试过这种写法吗,如果可以的话,能麻烦给一个简单的demo吗。
>>
>>
>> 感谢帮助!
>> ```
>> public interface AssignerWithPeriodicWatermarks extends
>> TimestampAssigner {
>>
>>  /**
>>  * Returns the current watermark. This method is periodically called by
>> the
>>  * system to retrieve the current watermark. The method may return {@code
>> null} to
>>  * indicate that no new Watermark is available.
>>  *
>>  * The returned watermark will be emitted only if it is non-null and
>> its timestamp
>>  * is larger than that of the previously emitted watermark (to preserve
>> the contract of
>>  * ascending watermarks). If the current watermark is still
>>  * identical to the previous one, no progress in event time has happened
>> since
>>  * the previous call to this method. If a null value is returned, or the
>> timestamp
>>  * of the returned watermark is smaller than that of the last emitted
>> one, then no
>>  * new watermark will be generated.
>>  *
>>  * The interval in which this method is called and Watermarks are
>> generated
>>  * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
>>  *
>>  * @see org.apache.flink.streaming.api.watermark.Watermark
>>  * @see ExecutionConfig#getAutoWatermarkInterval()
>>  *
>>  * @return {@code Null}, if no watermark should be emitted, or the next
>> watermark to emit.
>>  */
>>  @Nullable
>>  Watermark getCurrentWatermark();
>> }
>> ```
>>
>>
>>  原始邮件
>> 发件人: tison
>> 收件人: user-zh
>> 发送时间: 2020年4月16日(周四) 20:33
>> 主题: Re: 为消息分配时间戳但不想重新分配水印
>>
>>
>> 在 getCurrentWatermark 里返回 null 就行了,会 forward 此前的 watermark 的。另外语义上使用
>> AssignerWithPunctuatedWatermarks 会更合适一点。 Best, tison. taowang <
>> taow...@deepglint.com> 于2020年4月16日周四 下午5:13写道: > Hello,大家好: > 在flink >
>> stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。 >
>> 为了实现这个功能,我想有两种方法: > 1. 在算子输出后面重新为消息分配水印:看到flink >
>> stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark
>> > `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。 > 2. >
>> 在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。
>> > > > 我现在只能使用`assignTimestampsAndWatermarks` >
>> 去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗? > 感谢解答!
>
>


Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 文章 tison
喔,看差了...Flink 原生没有这样的支持,不过可以扩展基础的算子来实现你要的逻辑

参考 assignTimestampsAndWatermarks
的实现,以及 TimestampsAndPunctuatedWatermarksOperator,不要重写 processWatermark
方法,应该可以实现。DataStream 方面调用更基础的 transform 方法

如果你觉得这是一个合理的需求,应该开箱即用,也可以到 Flink 的 JIRA(issue)列表上提
https://jira.apache.org/jira/projects/FLINK/issues 注册 id 后创建 JIRA 单即可

Best,
tison.


taowang  于2020年4月16日周四 下午10:12写道:

> 感谢回复,但是很抱歉我试了一下发现不可以。
> 无论是使用了`AssignerWithPeriodicWatermarks`的`getCurrentWatermark`还是`AssignerWithPunctuatedWatermarks`的`checkAndGetNextWatermark`,当它们`return
> null`时下游算子拿到的水印都显示为`No
> Watermark`,我在下游算子中打印出`context.currentWatermark()`发现都是`Long.MIN_VALUE`的值。
> 看了这两个接口文档,不太理解这里的`no new watermark will be
> generated.`是什么意思,一般理解为不会生成新的水印那之前的水印就应该被继续传下去吧,但实际看来是下游接收不到水印(`no
> watermark`?)。
> @tison,你那边是尝试过这种写法吗,如果可以的话,能麻烦给一个简单的demo吗。
>
>
> 感谢帮助!
> ```
> public interface AssignerWithPeriodicWatermarks extends
> TimestampAssigner {
>
>  /**
>  * Returns the current watermark. This method is periodically called by the
>  * system to retrieve the current watermark. The method may return {@code
> null} to
>  * indicate that no new Watermark is available.
>  *
>  * The returned watermark will be emitted only if it is non-null and
> its timestamp
>  * is larger than that of the previously emitted watermark (to preserve
> the contract of
>  * ascending watermarks). If the current watermark is still
>  * identical to the previous one, no progress in event time has happened
> since
>  * the previous call to this method. If a null value is returned, or the
> timestamp
>  * of the returned watermark is smaller than that of the last emitted one,
> then no
>  * new watermark will be generated.
>  *
>  * The interval in which this method is called and Watermarks are
> generated
>  * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
>  *
>  * @see org.apache.flink.streaming.api.watermark.Watermark
>  * @see ExecutionConfig#getAutoWatermarkInterval()
>  *
>  * @return {@code Null}, if no watermark should be emitted, or the next
> watermark to emit.
>  */
>  @Nullable
>  Watermark getCurrentWatermark();
> }
> ```
>
>
>  原始邮件
> 发件人: tison
> 收件人: user-zh
> 发送时间: 2020年4月16日(周四) 20:33
> 主题: Re: 为消息分配时间戳但不想重新分配水印
>
>
> 在 getCurrentWatermark 里返回 null 就行了,会 forward 此前的 watermark 的。另外语义上使用
> AssignerWithPunctuatedWatermarks 会更合适一点。 Best, tison. taowang <
> taow...@deepglint.com> 于2020年4月16日周四 下午5:13写道: > Hello,大家好: > 在flink >
> stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。 >
> 为了实现这个功能,我想有两种方法: > 1. 在算子输出后面重新为消息分配水印:看到flink >
> stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark
> > `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。 > 2. >
> 在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。
> > > > 我现在只能使用`assignTimestampsAndWatermarks` >
> 去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗? > 感谢解答!


Re: 1.10任务执行过程--源码的一些疑问

2020-04-19 文章 tison
invokable 一般是 StreamTask 或者它的子类 StreamSourceTask,具体的 UDF 在 StreamTask
里,有几层包装。

MailBox 那些其实是一个简单的 EventLoop 实现,或者你理解为 Actor Model 的实现也行,可以参考这些名词的解释文章一一对应。

Best,
tison.


祝尚 <17626017...@163.com> 于2020年4月19日周日 下午5:43写道:

> Hi,all
> 在阅读1.10有关job执行过程相关源码时遇到一些疑问,我在看到Task#doRun()方法
> invokable.invoke();具体执行过程应该在这个方法里吧?
> 进一步看了StreamTask#invoke()->runMailboxLoop();继续往下深入也没发现最终调用udf的入口
> 问题1:MailboxProcessor、Mailbox、Mail这些概念什么意思,什么作用?
>
>
> 然而在另一处实例化AbstractInvokable时,比如StreamTask构造函数里会调用processInput方法,这个就类似1.9之前的实现方式了
> this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox,
> actionExecutor);
> 问题2:这里面是真正的数据处理过程吗?为什么不像1.9之前那样在invokable.invoke()里面做业务处理?
> 感谢您的答复!
>
>
>
> Best,
> Sun.Zhu
>
>
>
>


Re: 如何看到他人问题

2020-04-21 文章 tison
cc


Leonard Xu  于2020年4月21日周二 下午5:03写道:

> Hi,
> 订阅user-zh邮件邮件组即可收到该邮件组里的所有邮件,
> 可以发送任意内容的邮件到  user-zh-subscr...@flink.apache.org  订阅来自
> user-zh@flink.apache.org 邮件组的邮件
>
> 邮件组的订阅管理,可以参考[1]
>
> 祝好,
> Leonard Xu
> https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list
>
> > 在 2020年4月21日,16:55,一袭月色 <1906286...@qq.com> 写道:
> >
> > 如何看到他人问题
>
>


Re: Re: flink启动任务的方式

2020-04-21 文章 tison
REST API jar run endpoint 不支持关联其他 jar 听起来是个问题。FatJar 是一种解决方案,这个可以提到 JIRA
上作为需求(x

Best,
tison.


Arnold Zai  于2020年4月21日周二 下午5:46写道:

> jarFiles参数不是个参数列表么,多传几个。
>
> 或把依赖提前部署到${FLINK_HOME}/plugins里
>
> chenxuying  于2020年4月21日周二 下午3:36写道:
>
> > 这个是可以 , 不过我们的需求不允许打FatJar
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-04-21 15:27:48,"Arnold Zai"  写道:
> > >打个FatJar
> > >
> > >chenxuying  于2020年4月21日周二 下午2:47写道:
> > >
> > >> 请问下目前flink的启动方式有哪些
> > >> 1 通过命令行来执行
> > >> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c
> > >> cn.xxx.flink.table.sql.Job
> /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
> > >> 2通过自带的webui页面上传jar , submit jar
> > >> 3 通过代码 createRemoteEnvironment
> > >>
> > >> 目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api
> > >> 无法实现命令行那样提供其他的jar包
> > >>
> > >>
> > >>
> > >>
> >
>


Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 文章 tison
IngestionTime 多次运行结果不一样很正常啊,试试 event time?

Best,
tison.


xuefli  于2020年4月15日周三 下午10:10写道:

> 遇到一个非常头痛的问题
>
> Flink1.10的集群,用hdfs做backend
>
> 一个流aStream准备了10亿的数据,另外一个流bStream百万
> 如果如下操作
>
> 我遇到一个问题 双流Join 
> 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同
>  。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后 
> 再对cStream进行keyBy-->timeWindow-->sum.
> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
> 用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
> 但数据量很大时,就会这样。
>
>
> 每次计算的结果不一样,这个对业务系统挑战巨大
>
>
> 发送自 Windows 10 版邮件应用
>
>


Re: flink 1.7.2 YARN Session模式提交任务问题求助

2020-04-15 文章 tison
注意环境变量和 fs.hdfs.hdfsdefault 要配置成 HDFS 路径或 YARN
集群已知的本地路径,不要配置成客户端的路径。因为实际起作用是在拉起 TM 的那台机器上解析拉取的。

Best,
tison.


Chief  于2020年4月15日周三 下午7:40写道:

> hi Yangze Guo
> 您说的环境变量已经在当前用户的环境变量文件里面设置了,您可以看看我的问题描述,现在如果checkpoint的路径设置不是namenode
> ha的nameservice就不会报错,checkpoint都正常。
>
>
>
>
> --原始邮件--
> 发件人:"Yangze Guo" 发送时间:2020年4月15日(星期三) 下午3:00
> 收件人:"user-zh"
> 主题:Re: flink 1.7.2 YARN Session模式提交任务问题求助
>
>
>
> Flink需要设置hadoop相关conf位置的环境变量 YARN_CONF_DIR or HADOOP_CONF_DIR [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html
>
> Best,
> Yangze Guo
>
> On Mon, Apr 13, 2020 at 10:52 PM Chief  
>  大家好
>  目前环境是flink 1.7.2,使用YARN Session模式提交任务,Hadoop 版本2.7.3,hdfs
> namenode配置了ha模式,提交任务的时候报以下错误,系统环境变量中已经设置了HADOOP_HOME,YARN_CONF_DIR,HADOOP_CONF_DIR,HADOOP_CLASSPATH,在flink_conf.yaml中配置了fs.hdfs.hadoopconf
> 
> 
>  2020-04-10 19:12:02,908 INFOnbsp;
> org.apache.flink.runtime.jobmaster.JobMasternbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; - Connecting to ResourceManager akka.tcp://flink@trusfortpoc1
> :23584/user/resourcemanager()
>  2020-04-10 19:12:02,909 INFOnbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolnbsp; nbsp;
> nbsp; nbsp; nbsp; - Cannot serve slot request, no
> ResourceManager connected. Adding as pending request
> [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}]
>  2020-04-10 19:12:02,911 INFOnbsp;
> org.apache.flink.runtime.jobmaster.JobMasternbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; - Resolved ResourceManager address, beginning registration
>  2020-04-10 19:12:02,911 INFOnbsp;
> org.apache.flink.runtime.jobmaster.JobMasternbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; - Registration at ResourceManager attempt 1 (timeout=100ms)
>  2020-04-10 19:12:02,912 INFOnbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolnbsp; nbsp;
> nbsp; nbsp; nbsp; - Cannot serve slot request, no
> ResourceManager connected. Adding as pending request
> [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}]
>  2020-04-10 19:12:02,913 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Registering job manager
> 0...@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0
> for job 24691b33c18d7ad73b1f52edb3d68ae4.
>  2020-04-10 19:12:02,917 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Registered job manager
> 0...@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0
> for job 24691b33c18d7ad73b1f52edb3d68ae4.
>  2020-04-10 19:12:02,919 INFOnbsp;
> org.apache.flink.runtime.jobmaster.JobMasternbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; - JobManager successfully registered at ResourceManager, leader
> id: .
>  2020-04-10 19:12:02,919 INFOnbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolnbsp; nbsp;
> nbsp; nbsp; nbsp; - Requesting new slot
> [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}] and profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
>  2020-04-10 19:12:02,920 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> 24691b33c18d7ad73b1f52edb3d68ae4 with allocation id
> AllocationID{5a12237c7f2bd8b1cc760ddcbab5a1c0}.
>  2020-04-10 19:12:02,921 INFOnbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolnbsp; nbsp;
> nbsp; nbsp; nbsp; - Requesting new slot
> [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}] and profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
>  2020-04-10 19:12:02,924 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Requesting new TaskExecutor container with resources
>   2020-04-10 19:12:02,926 INFOnbsp;
> org.apache.flink.yarn.YarnResourceManagernbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp;- Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> 24691b33c18d7ad73b1f52edb3d68ae4 with allocation id
> AllocationID{37dd666a18040bf63ffbf2e022b2ea9b}.
>  2020-04-10 19:12:0

Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-15 文章 tison
FYI

https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamp_extractors.html

IngestionTime 的时间基准是进入 Flink 系统即 Source 算子生产数据时附上的时间戳,你后面有
window/join,这个时间的间隔不是确定性的,大数据量下因为 GC 等原因有不同延迟从而导致 Window 划分不一致是正常的。即使使用
EventTime,在 Watermark
不够可靠的情况下也有可能不一致,足够可靠又可能受到滞后数据的影响损失性能并占用内存。可以看看上面文档的相关内容。

Best,
tison.


tison  于2020年4月15日周三 下午10:18写道:

> IngestionTime 多次运行结果不一样很正常啊,试试 event time?
>
> Best,
> tison.
>
>
> xuefli  于2020年4月15日周三 下午10:10写道:
>
>> 遇到一个非常头痛的问题
>>
>> Flink1.10的集群,用hdfs做backend
>>
>> 一个流aStream准备了10亿的数据,另外一个流bStream百万
>> 如果如下操作
>>
>> 我遇到一个问题 双流Join 
>> 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同
>>  。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后 
>> 再对cStream进行keyBy-->timeWindow-->sum.
>> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
>> 用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
>> 但数据量很大时,就会这样。
>>
>>
>> 每次计算的结果不一样,这个对业务系统挑战巨大
>>
>>
>> 发送自 Windows 10 版邮件应用
>>
>>


Re: 最新代码编译问题

2020-03-16 文章 tison
Hi,

You'd better use English in user mailing list.

If you prefer Chinese, you can post the email to user-zh@flink.apache.org .

Best,
tison.


tison  于2020年3月16日周一 下午4:25写道:

> 从 flink/ 根目录运行 mvn clean install -DskipTests
>
> 你这个问题是因为 impl 那些类是生成类,一般来说从根目录运行一次全量编译可以解决各种疑难杂症
>
> Best,
> tison.
>
>
> 吴志勇 <1154365...@qq.com> 于2020年3月16日周一 下午4:23写道:
>
>> 您好,
>> 我从github上下载了最新的代码。在IDEA中尝试编译,但是flink-table项目flink-sql-parser编译报错,
>>
>> test中也同样报错,
>>
>> 请问该如何解决呢?flink-sql-parser像是缺少了impl包呀。
>>
>


Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-22 文章 tison
>》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
>这个能拿到

你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
KafkaTableSourceSinkFactory
吗?(同时 class loading 为 child-first)

如果是这样,听起来 client 的 classloading 策略没啥问题,似乎是 SPI 加载那边的 ClassLoader 有问题。之前
FileSystem 相关解析就出过类似的 ClassLoader 的 BUG

Best,
tison.


宇张  于2020年4月23日周四 上午11:36写道:

> 我尝试进行了添加,程序依然无法运行,异常信息和上面一致,下面是我的shade配置:
>
> 
> org.apache.maven.plugins
> maven-shade-plugin
> 
> 
> 
> package
> 
> shade
> 
> 
> 
> 
>
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>
> com.akulaku.data.main.StreamMain
> 
> 
>
> 
> 
> *:*
> 
> META-INF/*.SF
> META-INF/*.DSA
> META-INF/*.RSA
> 
> 
> 
>
> 
> 
> 
>
> org.apache.flink:flink-table-common
>
> org.apache.flink:flink-table-api-java
>
> org.apache.flink:flink-table-api-java-bridge_2.11
>
> org.apache.flink:flink-table-planner-blink_2.11
>
> org.apache.flink:flink-connector-kafka-0.11_2.11
>
> org.apache.flink:flink-connector-kafka-base_2.11
> org.apache.flink:flink-json
> 
> 
> 
> 
> 
> com.ibm.icu
>
> org.apache.flink.table.shaded.com.ibm.icu
> 
> 
> 
> 
> 
> 
>
>
> On Thu, Apr 23, 2020 at 10:53 AM Jingsong Li 
> wrote:
>
> > Hi,
> >
> > Flink的connector发现机制是通过java spi服务发现机制的,所以你的services下文件不包含Kafka相关的内容就不会加载到。
> >
> > > 而且两种打包方式运行时是都能加载到KafkaFactory类文件的
> >
> > 只是类文件是没有用的,没地方引用到它。
> >
> > 你试试[1]中的方法?添加combine.children
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-uber-blink/pom.xml#L104
> >
> > Best,
> > Jingsong Lee
> >
> > On Thu, Apr 23, 2020 at 10:37 AM 宇张  wrote:
> >
> > >
> > >
> >
> 我这面采用shade打包方式进行了尝试,发现依然运行出错,运行错误日志与assembly打包产生的错误日志一致,就是上面提到的错误,而且shade和assembly打包产生的
> > >
> > >
> >
> META-INF/services/org.apache.flink.table.factories.TableFactory文件及里面的内容一致,而且两种打包方式运行时是都能加载到KafkaFactory类文件的,所以貌似不是打包导致的问题,而更像是bug
> > > 下面是我maven插件配置:
> > >
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > org.apache.maven.plugins
> > > maven-shade-plugin
> > > 
> > > 
> > > 
> > > package
> > > 
> > > shade
> > > 
> > > 
> > > 
> > >  > >
> > >
> > >
> >
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> > >
> > > com.akulaku.data.main.StreamMain
> > > 
> > > 
> > >
> > > 
> > > 
> > > *:*
> > > 
> > > META-INF/*.SF
> > > META-INF/*.DSA
> > > META-INF/*.RSA
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > >
> > >
> > > On Wed, Apr 22, 2020 at 8:00 PM Jingsong Li 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > >
> > > >
> > >
> >
> 如果org.apache.flink.table.factories.TableFactory里面没有

Re: flink1.10关于jar包冲突问题

2020-04-22 文章 tison
能具体看一下报错吗?一般来说 Flink 自己需要的依赖都会 shaded 起来,不需要的传递依赖都应该 exclude 掉。暴露成 API
的类别一般需要封装或者使用稳定的接口。

这可能是一个工程上的问题,你可以具体罗列一下遇到的 JAR 包冲突问题,看一下怎么解。

Best,
tison.


宇张  于2020年4月22日周三 上午11:52写道:

> 在使用Flink1.10时,遇到最多的问题就是jar包冲突问题,okio这个包flink-parent引用的就有四个版本,还有一些没办法<
> exclusions>的包,请问社区有没有优化jar包冲突的提议。
>


Re: json中date类型解析失败

2020-04-22 文章 tison
应该是有内置的 UDF FROM_UNIXTIME 可以用的

Best,
tison.


Leonard Xu  于2020年4月22日周三 下午1:15写道:

> Hi
>  报错是因为'format.ignore-parse-errors'
> 参数是在社区最新的版本才支持的,FLINK-16725在1.11应该也会修复,如果需要使用的话可以等1.11发布后使用或者自己编译master分支,
> 即使有了这个参数你的问题也无法解决,对你的case每行记录都会解析错误所以会过滤掉所有数据。
> 建议你可以在数据源就转为标准的json格式或者写个udf将long转为timestamp后使用。
>
> 祝好,
> Leonard Xu
>
> > 在 2020年4月22日,12:33,王双利  写道:
> >
> > 要不你们再做一个fastjson版本的?
> > 目前内部解析用的都是fastjson
> >
> >
> >
> > 发件人: 王双利
> > 发送时间: 2020-04-22 12:31
> > 收件人: user-zh
> > 主题: 回复: Re: json中date类型解析失败
> >配置后报错误 ,
> > 'format.ignore-parse-errors' = 'true'
> > 这个参数需要怎么配置呢?
> > The matching candidates:
> >org.apache.flink.formats.json.JsonRowFormatFactory
> >Unsupported property keys:
> >format.ignore-parse-errors
> > WITH (
> > ..
> > 'format.type' = 'json',
> > 'format.ignore-parse-errors' = 'true',
> > 
> > )
> >
> >
> >
> > 发件人: Leonard Xu
> > 发送时间: 2020-04-22 12:18
> > 收件人: user-zh; 王双利
> > 主题: Re: json中date类型解析失败
> > Hi,
> > flink支持的json format是遵循RFC标准[1]的,不支持从long型转化为json timestamp, json的
> tiemstamp类型转化可以简单参考下,这个虽然符合标准,单对用户习惯来说确实不友好,目前社区也有一个jira[2]在跟进这个问题了。关于鲁棒性的问题,json
> format有个参数支持跳过解析错误的记录,'format.ignore-parse-errors' = 'true'
> >
> >
> > ```
> > Long time = System.currentTimeMillis();
> > DateFormat dateFormat =  new
> SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'");
> > Date date = new Date(time);
> > String jsonSchemaDate = dateFormat.format(date);
> > ```
> > [1]
> https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times
> > [2] https://issues.apache.org/jira/browse/FLINK-16725 <
> https://issues.apache.org/jira/browse/FLINK-16725>
> >
> > Best,
> > Leonard Xu
> >
> >> 在 2020年4月22日,12:05,王双利  写道:
> >>
> >> 使用  flink-json -1.10.0 解析json数据报下面的错误
> >>
> >> Caused by: java.time.format.DateTimeParseException: Text
> '1587527019680' could not be parsed at index 0
> >>
> >> 经检查 是 以下字段导致的
> >>
> {"jnlno":"e4574cce-8c9f-4d3f-974f-fc15250ec10d","ip":"122.96.41.218","channel":"pc","transdate":1587527019680,"event":"login","userid":"9","deviceid":"11","taskid":"1","retcode":"00","status":"fail"}
> >>
> >> 其中 transdate 是使用fastjson序列化得来的
> >>
> request.put("transdate",cal.getTime());JSON.toJSONString(request),解析失败后,系统直接停止,这个觉得也不太好吧,鲁棒性不够,万一其他系统发送一个错误格式的,系统直接挂掉,感觉不太合理。
> >> 以上的应该怎么解决才合适。
> >>
> >>
> >
>
>


Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-22 文章 tison
虽然你放到 lib 下就能行了听起来是个 BUG,能不能说明一下你的 Flink 版本还有具体的启动命令。

FLINK-13749 可能在早期版本上没有,另外 Standalone 的类加载如果是 PerJob 有更改过。

Best,
tison.


tison  于2020年4月22日周三 下午5:48写道:

> 看下你打包的 UberJar 里有没一个内容包括
>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>
> 的文件
>
> META-INF/services/org.apache.flink.table.factories.TableFactory
>
> Best,
> tison.
>
>
> 宇张  于2020年4月22日周三 下午5:30写道:
>
>> 我这面使用Standalone模式运行Flink任务,但是Uber
>> Jar里面的TableSourceFactory不能被加载,即使设置了classloader.resolve-order:
>> child-first,只有放在lib目录才能加载得到,我看发布文档跟改了类加载策略,但是我不知道为什么Uber
>> Jar里面的Factory不能被加载
>> Flink Client respects Classloading Policy (FLINK-13749
>> <https://issues.apache.org/jira/browse/FLINK-13749>)
>> <
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#flink-client-respects-classloading-policy-flink-13749
>> >
>>
>> The Flink client now also respects the configured classloading policy,
>> i.e., parent-first or child-first classloading. Previously, only cluster
>> components such as the job manager or task manager supported this setting.
>> This does mean that users might get different behaviour in their programs,
>> in which case they should configure the classloading policy explicitly to
>> use parent-first classloading, which was the previous (hard-coded)
>> behaviour.
>>
>> 异常信息:
>>
>>   rg.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: findAndCreateTableSource failed.
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at
>>
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: org.apache.flink.table.api.TableException:
>> findAndCreateTableSource failed.
>> at
>>
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>> at
>>
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>> at
>>
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
>> at
>>
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
>> at
>>
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
>> at
>>
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>> at
>>
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>> at
>>
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>> at
>>
>> org.apache.flink.table.planner.operatio

Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-22 文章 tison
看下你打包的 UberJar 里有没一个内容包括

org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory

的文件

META-INF/services/org.apache.flink.table.factories.TableFactory

Best,
tison.


宇张  于2020年4月22日周三 下午5:30写道:

> 我这面使用Standalone模式运行Flink任务,但是Uber
> Jar里面的TableSourceFactory不能被加载,即使设置了classloader.resolve-order:
> child-first,只有放在lib目录才能加载得到,我看发布文档跟改了类加载策略,但是我不知道为什么Uber Jar里面的Factory不能被加载
> Flink Client respects Classloading Policy (FLINK-13749
> <https://issues.apache.org/jira/browse/FLINK-13749>)
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#flink-client-respects-classloading-policy-flink-13749
> >
>
> The Flink client now also respects the configured classloading policy,
> i.e., parent-first or child-first classloading. Previously, only cluster
> components such as the job manager or task manager supported this setting.
> This does mean that users might get different behaviour in their programs,
> in which case they should configure the classloading policy explicitly to
> use parent-first classloading, which was the previous (hard-coded)
> behaviour.
>
> 异常信息:
>
>   rg.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: findAndCreateTableSource failed.
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at
>
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: org.apache.flink.table.api.TableException:
> findAndCreateTableSource failed.
> at
>
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
> at
>
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2083)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
> at
>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
> at
>
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
> at com.akulaku.data.main.StreamMain.main(StreamMain.java:87)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method

Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-23 文章 tison
这个问题我建议你记一个 JIRA 然后提供一个可复现的程序。因为你如果是 Flink Standalone Session 模式,在 Client
端编译失败抛出如上异常,不应该跟放不放在 lib 下有什么关系。这边听你说感觉也很奇怪,可能需要本地复现一下比较好判断。

Best,
tison.


宇张  于2020年4月23日周四 上午11:53写道:

> 你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
> KafkaTableSourceSinkFactory
> 吗?(同时 class loading 为 child-first)
> 》》是的
>
> On Thu, Apr 23, 2020 at 11:42 AM tison  wrote:
>
> > >》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
> > >这个能拿到
> >
> > 你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
> > KafkaTableSourceSinkFactory
> > 吗?(同时 class loading 为 child-first)
> >
> > 如果是这样,听起来 client 的 classloading 策略没啥问题,似乎是 SPI 加载那边的 ClassLoader 有问题。之前
> > FileSystem 相关解析就出过类似的 ClassLoader 的 BUG
> >
> > Best,
> > tison.
> >
> >
> > 宇张  于2020年4月23日周四 上午11:36写道:
> >
> > > 我尝试进行了添加,程序依然无法运行,异常信息和上面一致,下面是我的shade配置:
> > >
> > > 
> > > org.apache.maven.plugins
> > > maven-shade-plugin
> > > 
> > > 
> > > 
> > > package
> > > 
> > > shade
> > > 
> > > 
> > > 
> > >  > >
> > >
> > >
> >
> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> > >
> > > com.akulaku.data.main.StreamMain
> > > 
> > > 
> > >
> > > 
> > > 
> > > *:*
> > > 
> > > META-INF/*.SF
> > > META-INF/*.DSA
> > > META-INF/*.RSA
> > > 
> > > 
> > > 
> > >
> > > 
> > > 
> > > 
> > >
> > > org.apache.flink:flink-table-common
> > >
> > > org.apache.flink:flink-table-api-java
> > >
> > > org.apache.flink:flink-table-api-java-bridge_2.11
> > >
> > > org.apache.flink:flink-table-planner-blink_2.11
> > >
> > > org.apache.flink:flink-connector-kafka-0.11_2.11
> > >
> > > org.apache.flink:flink-connector-kafka-base_2.11
> > > org.apache.flink:flink-json
> > > 
> > > 
> > > 
> > > 
> > > 
> > > com.ibm.icu
> > >
> > >
> org.apache.flink.table.shaded.com.ibm.icu
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > >
> > >
> > > On Thu, Apr 23, 2020 at 10:53 AM Jingsong Li 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Flink的connector发现机制是通过java
> > spi服务发现机制的,所以你的services下文件不包含Kafka相关的内容就不会加载到。
> > > >
> > > > > 而且两种打包方式运行时是都能加载到KafkaFactory类文件的
> > > >
> > > > 只是类文件是没有用的,没地方引用到它。
> > > >
> > > > 你试试[1]中的方法?添加combine.children
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-uber-blink/pom.xml#L104
> > > >
> > > > Best,
> > > > Jingsong Lee
> > > >
> > > > On Thu, Apr 23, 2020 at 10:37 AM 宇张  wrote:
> > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 我这面采用shade打包方式进行了尝试,发现依然运行出错,运行错误日志与assembly打包产生的错误日志一致,就是上面提到的错误,而且shade和assembly打包产生的
> > > > >
> > > > >
> > > >
> > >
> >
> META-INF/services/org.apache.flink.table.factories.TableFactory文件及里面的内容一致,而且两种打包方式运行时是都能加载到KafkaFactory类文件的,所以貌似不是打包导致的问题,而更像是bug
> > > > > 下面是我maven插件配置:
> > > > >
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > >

Re: 关于Flink1.10 Standalone 模式任务提交

2020-04-23 文章 tison
另外你 shaded 里面去 shaded com.ibm.icu 也意义不明...

Best,
tison.


tison  于2020年4月23日周四 下午3:34写道:

> 这个问题我建议你记一个 JIRA 然后提供一个可复现的程序。因为你如果是 Flink Standalone Session 模式,在 Client
> 端编译失败抛出如上异常,不应该跟放不放在 lib 下有什么关系。这边听你说感觉也很奇怪,可能需要本地复现一下比较好判断。
>
> Best,
> tison.
>
>
> 宇张  于2020年4月23日周四 上午11:53写道:
>
>> 你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
>> KafkaTableSourceSinkFactory
>> 吗?(同时 class loading 为 child-first)
>> 》》是的
>>
>> On Thu, Apr 23, 2020 at 11:42 AM tison  wrote:
>>
>> > >》拿到ClassLoader后看下能不能取到KafkaTableSourceSinkFactory的class
>> > >这个能拿到
>> >
>> > 你的意思是,UberJar 不放在 lib 里,在用户程序里通过线程上下文 ClassLoader 能加载到
>> > KafkaTableSourceSinkFactory
>> > 吗?(同时 class loading 为 child-first)
>> >
>> > 如果是这样,听起来 client 的 classloading 策略没啥问题,似乎是 SPI 加载那边的 ClassLoader 有问题。之前
>> > FileSystem 相关解析就出过类似的 ClassLoader 的 BUG
>> >
>> > Best,
>> > tison.
>> >
>> >
>> > 宇张  于2020年4月23日周四 上午11:36写道:
>> >
>> > > 我尝试进行了添加,程序依然无法运行,异常信息和上面一致,下面是我的shade配置:
>> > >
>> > > 
>> > > org.apache.maven.plugins
>> > > maven-shade-plugin
>> > > 
>> > > 
>> > > 
>> > > package
>> > > 
>> > > shade
>> > > 
>> > > 
>> > > 
>> > > > > >
>> > >
>> > >
>> >
>> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>> > >
>> > > com.akulaku.data.main.StreamMain
>> > > 
>> > > 
>> > >
>> > > 
>> > > 
>> > > *:*
>> > > 
>> > > META-INF/*.SF
>> > > META-INF/*.DSA
>> > > META-INF/*.RSA
>> > > 
>> > > 
>> > > 
>> > >
>> > > 
>> > > 
>> > > 
>> > >
>> > > org.apache.flink:flink-table-common
>> > >
>> > > org.apache.flink:flink-table-api-java
>> > >
>> > > org.apache.flink:flink-table-api-java-bridge_2.11
>> > >
>> > > org.apache.flink:flink-table-planner-blink_2.11
>> > >
>> > > org.apache.flink:flink-connector-kafka-0.11_2.11
>> > >
>> > > org.apache.flink:flink-connector-kafka-base_2.11
>> > > org.apache.flink:flink-json
>> > > 
>> > > 
>> > > 
>> > > 
>> > > 
>> > > com.ibm.icu
>> > >
>> > >
>> org.apache.flink.table.shaded.com.ibm.icu
>> > > 
>> > > 
>> > > 
>> > > 
>> > > 
>> > > 
>> > >
>> > >
>> > > On Thu, Apr 23, 2020 at 10:53 AM Jingsong Li 
>> > > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > Flink的connector发现机制是通过java
>> > spi服务发现机制的,所以你的services下文件不包含Kafka相关的内容就不会加载到。
>> > > >
>> > > > > 而且两种打包方式运行时是都能加载到KafkaFactory类文件的
>> > > >
>> > > > 只是类文件是没有用的,没地方引用到它。
>> > > >
>> > > > 你试试[1]中的方法?添加combine.children
>> > > >
>> > > > [1]
>> > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-uber-blink/pom.xml#L104
>> > > >
>> > > > Best,
>> > > > Jingsong Lee
>> > > >
>> > > > On Thu, Apr 23, 2020 at 10:37 AM 宇张  wrote:
>> > > >
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> 我这面采用shade打包方式进行了尝试,发现依然运行出错,运行错误日志与assembly打包产生的错误日志一致,就是上面提到的错误,而且shade和assembly打包产生的
>> > > > >
>> >

Re: flink barrier对齐 理解

2020-05-17 文章 tison
Hi,

你可以看一下官网这张经典的图[1][2],snapshot 是按算子级别来看的,跟 source 不 source 没啥关系,全局的 chk 由 jm
上的 checkpoint coordinator 协调。

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/fig/stream_aligning.svg
[2]
https://ci.apache.org/projects/flink/flink-docs-master/concepts/stateful-stream-processing.html




了不起的盖茨比 <573693...@qq.com> 于2020年5月17日周日 下午2:50写道:

>
> 我的理解是一条数据,会经过n个算子,只有这个数据到达最后一个算子计算完毕,才能checkpoint,否则会导致前几个算子state改变,但是这个数据的offset没有被提交,导致了重复消费数据。
>
>
>
>
>
> -- 原始邮件 --
> 发件人: Benchao Li  发送时间: 2020年5月17日 13:28
> 收件人: user-zh  主题: 回复:flink barrier对齐 理解
>
>
>
> 我感觉应该是这样的:
>
> 比如有两个算子
> A ---hash--- B
>
> A和B分别有2和3个并发。那就是说对于B的某个subtask来讲,需要对齐上游A的2个subtask发过来的barrier,才能做checkpoint。
>
>
> 了不起的盖茨比 <573693...@qq.com 于2020年5月17日周日 下午1:16写道:
>
>  可以理解成,有多个subtask时候,需要等待不同subtask消费数据完毕,之后做checkpoint
> 
> 
> 
> 
> 
>  -- 原始邮件 --
>  发件人: Benchao Li   发送时间: 2020年5月17日 11:34
>  收件人: user-zh   主题: 回复:flink barrier对齐 理解
> 
> 
> 
>  Hi,
> 
>  我对这块不是非常了解,但是我理解的barrier对齐,指的是同一个Task的多个subtask之间对齐吧。
>  比如你只有一个source,然后经过keyby之后做了其他的操作,那也是存在barrier对齐的。
> 
>  了不起的盖茨比 <573693...@qq.comgt; 于2020年5月17日周日 上午11:29写道:
> 
>  gt; 请教一下,如果只有一个source,就不需要对齐了吧?只有source多个数据源时候才需要对齐?
> 
> 
> 
>  --
> 
>  Benchao Li
>  School of Electronics Engineering and Computer Science, Peking
> University
>  Tel:+86-15650713730
>  Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

2020-05-17 文章 tison
考虑把 SQL 贴成 gist 链接?

Best,
tison.


claylin <1012539...@qq.com> 于2020年5月17日周日 下午5:32写道:

> sql作业定义如下,也通过TableConfig设置了最大和最小idle
> time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE
> TABLE yy_yapmnetwork_original ( happenAt BIGINT, uid BIGINT,
>  appId STRING, deviceId STRING, appVer STRING, dnsDur BIGINT,
>useGlb INT, hitCache INT, requestSize DOUBLE, responseSize
> DOUBLE, totalDur BIGINT, url STRING, statusCode INT,
>  prototype STRING, netType STRING, traceId STRING, ts AS
> CAST(FROM_UNIXTIME(happenAt/1000) AS TIMESTAMP(3)), WATERMARK FOR ts AS
> ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka',
> 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics',
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
> 'connector.properties.bootstrap.servers' = 'kafkawx007-core001.yy.com:8101
> ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', '
> connector.properties.group.id' = 'interface_success_rate_consumer',
> 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' );
> create table request_latency_tbl ( app_id string, app_ver string,
>net_type string, prototype string, url string, status_code
> int, w_start string, success_cnt BIGINT, failure_cnt BIGINT,
>  total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 'connector.url' =
> 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=truecharacterEncoding=utf-8zeroDateTimeBehavior=convertToNullautoReconnect=true',
> 'connector.table' = 'request_latency_statistics', 'connector.username' =
> 'yapm_metrics', 'connector.password' = '1234456',
> 'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval'
> = '5s', 'connector.write.max-retries' = '2' ); create view
> request_1minutes_latency  as select appId, appVer, netType, prototype,
> url, statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm') w_start,
>  count(distinct traceId) filter (where statusCode in (200)) as successCnt,
>count(distinct traceId) filter (where statusCode not in (200)) as
> failureCnt, count(distinct traceId) as total_cnt from
> yy_yapmnetwork_original group by appId, appVer, netType, prototype, url,
> statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm'); insert into
> request_latency_tbl select * from  request_1minutes_latency;


Re: save point容灾方案咨询

2020-05-17 文章 tison
这个我理解不在 Flink 的范畴里啊。你 savepoint 存到一个位置,然后外部挂一个同步器在主集群和容灾集群里同步(savepoint
目录)就可以吧。

Best,
tison.


zhisheng  于2020年5月17日周日 下午8:40写道:

> hi
>
> 如果做 Checkpoint 或者 Savepoint 的时候可以填两个 HDFS 集群的地址路径(一个是你的主集群/另一个是容灾集群)
> 是不是就可以解决你现在的问题,达到你想要的需求?
>
> Best
>
> zhisheng
>
> 请叫我雷锋 <854194...@qq.com> 于2020年5月17日周日 下午7:32写道:
>
> > 谢谢关注:
> >
> >
> > savepoint 容灾 是指的,每次执行savepoint生成的文件,能够在容灾集群上做备份。当主集群变得不可用时,可以将任务迁移到容灾
> > 集群进行根据savepoint 进行任务恢复。
> >
> >
> > --原始邮件--
> > 发件人:"Congxian Qiu" > 发送时间:2020年5月17日(星期天) 晚上6:01
> > 收件人:"user-zh" >
> > 主题:Re: save point容灾方案咨询
> >
> >
> >
> > 你好
> >
> > 请问这里的 savepoint 容灾的 “容灾” 具体是指什么呢?希望解决什么问题呢?
> >
> > Best,
> > Congxian
> >
> >
> > LakeShen  >
> >  Hi ,
> > 
> >  你可以把你的场景在描述的详细一些。
> > 
> >  Best,
> >  LakeShen
> > 
> >  请叫我雷锋 <854194...@qq.com 于2020年5月14日周四 下午9:42写道:
> > 
> >   各位大佬好,请问有啥好的save point容灾方案嘛?
> >  
> >  
> >  
> >   发自我的iPhone
> > 
>


Re: flink build-in 的 udf 的源码

2020-05-16 文章 tison
Hi Benchao,

我想搭车问一下这个代码生成是全局仅一次还是每个 call 都会走一遍流程?或者是其他策略。

Best,
tison.


Benchao Li  于2020年5月16日周六 下午9:50写道:

> Hi,
>
> Flink内置函数的实现方式跟udf不太一样,很多函数是直接用的代码生成来做的。
>
> 下面是以blink planner为例,大概说下流程:
> 1. FlinkSqlOperatorTable 这个类里面放的是内置函数表,这个表会被calcite parse
> SQL的时候用到,直接把这些函数识别为具体的某个函数定义。
> 2.
>
> 然后再代码生成阶段,会识别到这些函数,根据不同的函数定义,生成不同的函数实现调用。这部分你可以直接看下`org.apache.flink.table.planner.codegen.calls`这个package下的代码。
> 3. 上面第2条说的主要是scalar function的生成方式,agg
>
> function还要特殊一点,这部分可以参考下`org.apache.flink.table.planner.functions.aggfunctions`这个package下的代码。
>
>
> venn  于2020年5月16日周六 下午3:53写道:
>
> > 各位大佬,请问下,flink 内置的 udf 的源码在什么位置,还有在哪里完成的函数注
> > 册? 非常感谢各位大佬回复
> >
> >
> >
> > Thanks a lot !
> >
> >
> >
> >
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: 關於LocalExecutionEnvironment使用MiniCluster的配置

2020-05-24 文章 tison
是这样的。

这里的配置可以参考[1][2]两个类,具体你 Maven 启动的代码路径还跟[3][4]有关。

这边可能确实文档比较缺失。可以看下配置传递的路径,TM 的数量还有 RPC 的共享格式等配置,至少编程接口上都是可以配的。

Best,
tison.

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
[2]
https://github.com/apache/flink/blob/ab947386ed93b16019f36c50e9a3475dd6ad3c4a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
[3]
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
[4]
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java




月月  于2020年5月24日周日 下午9:11写道:

> 您好,
> 在單機模式使用maven執行專案時,會自動啟動MiniCluster,
> 我想請問在這種情形下,預設是配置一個JobManager以及一個TaskManager嗎?
>
> 找了一下文件中並沒有相關的說明。
>
> 感謝!
>


Re: 关于水位线Watermark的理解

2020-05-24 文章 tison
整体没啥问题,但是我看你说【假如第一个数据的事件时间刚好为12:00的,那么此时水位线应该在11:59】,这个 Watermark 跟
allowedLateness 没啥关系哈,是独立的逻辑。

文档层面你可以看看[1],源码你可以看看[2]里面检索 allowedLateness

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#allowed-lateness
[2]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java



Benchao Li  于2020年5月24日周日 下午9:56写道:

> Hi,
> 你理解的是正确的,进入哪个窗口完全看事件时间,窗口什么时候trigger,是看watermark。
>
> smq <374060...@qq.com> 于2020年5月24日周日 下午9:46写道:
>
> >
> >
> 使用时间时间窗口处理关于数据延迟,加入允许延迟时间为1min,窗口大小是10min,那么在12:00-12:10这个窗口中,如果事件时间是在12:09:50这个数据在12:10:50这个数据到达,并且此时水位线刚好在12:09:50,那么这个延迟数据可以被处理,这个可以理解。
> >
> >
> 但是,假如第一个数据的事件时间刚好为12:00的,那么此时水位线应该在11:59,这个数据能进入12:00-12:10这个窗口被处理吗。按道理来说应该被正确处理。那么这样的话,进入窗口是按照事件时间,触发是按照水印时间。不知道这么理解对不对,这个问题想了很久。
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: 使用广播流要怎么保证广播流比数据流先到?

2020-05-24 文章 tison
高老师的方案应该是比较 make sense 的,你从网络上去限制某个先到后到很麻烦,而且就算可以,也会涉及 Flink
网络层很底层的逻辑。通常来说希望【先到】的含义是【先处理】,那你把物理上先到的缓存起来后处理就可以了。

Best,
tison.


1048262223 <1048262...@qq.com> 于2020年5月24日周日 下午2:08写道:

> Hello,我的理解是这样的
> 广播流一般都是为了减少访问外部配置数据,提高性能来使用的,因此如果你是在这种场景下使用播流,我有一个在生产实践过的方法可供参考。
>
> 可以先在正常数据处理流的open方法中初始化访问一次配置,后续配置变更时再去使用广播中的数据对配置进行更新。如果硬要求某些数据必须在某个广播流配置数据更新后才能进行处理,则可以使用大佬们在上面提供的用state存储的方式进行解决。
>
>
> -- 原始邮件 --
> 发件人: Yun Gao  发送时间: 2020年5月24日 13:56
> 收件人: 462329521 <462329...@qq.com, user-zh  
> 主题: 回复:使用广播流要怎么保证广播流比数据流先到?
>
>
>
> Hello,据我了解,现在应该法有办法做到让一个流先到。
>
> 一种workaround的方法应该是在广播流全部到达之前,通过state先缓存收到的数据;然后等到广播流到达后再进行处理。
>
>
> --
> Sender:462329521<462329...@qq.com
> Date:2020/05/24 11:32:17
> Recipient:user-zh Theme:使用广播流要怎么保证广播流比数据流先到?
>
> 你好,我想问一下我们在业务系统中需要广播流比数据流先到,要怎么保证这种先后顺序?


Re: 订阅

2020-10-08 文章 tison
Please send email with any content to -subscr...@flink.apache.org
for subscription.

For example, mailto:user-zh-subscr...@flink.apache.org to subscribe
user-zh@flink.apache.org

Best,
tison.


葛春法-18667112979  于2020年10月8日周四 下午8:45写道:

> I want to subscribe flink mail.


Re: 一个main方法启动2个yarn job问题

2020-08-28 文章 tison
应该说 SQL 的 update 会在底层也 call 一次 env.execute

如果你配的是所谓的 detach 模式,是有这种可能的。这个是实现问题,你可以先贴一下代码,然后描述你要的行为,看下可以怎么写

Best,
tison.


Rui Li  于2020年8月28日周五 下午9:59写道:

> 作业代码是怎么写的啊?按说写SQL的话不需要执行Env.execute
>
> On Fri, Aug 28, 2020 at 9:41 AM air23  wrote:
>
> > 你好。我有一个接kafka 写入tidb的任务 为什么会启动2个yarn任务去运行呢?
> > 我是先用datastream 接入kafka。然后转成table sql写入到tidb
> > 2个job name 一个叫Env.execute配置的名字
> > 一个是叫insert 写入tidb的sql语句名字
> >
> >
>
> --
> Best regards!
> Rui Li
>


Re: flink任务yarn perjob 提交任务如何设置job name

2020-09-30 文章 tison
代码里 env.execute("你的作业名")

Best,
tison.


丁浩浩 <18579099...@163.com> 于2020年9月30日周三 下午3:44写道:

> 如题,我需要设置flink提交到yarn的job name应该怎么设置呢?


Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-30 文章 tison
Hi Yang,

你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么?

如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。

Best,
tison.


Yang Peng  于2020年9月30日周三 上午10:29写道:

> 感谢回复,我们看了consumer的lag很小
> 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
> 而且任务重启了没法jstack判断了
>
> hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:
>
> >
> >
> >
> > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
> > 也可以 jstack 采下堆栈看下,GC等看下。
> > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
> > Best,
> > Hailong Wang
> > 在 2020-09-29 20:06:50,"Yang Peng"  写道:
> >
> >
> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
> > >flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
> > >
> > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
> > >
> > >>
> > >>
> > >>
> > >> Hi Yang Peng:
> > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
> > >> 2. Source 的序列化耗时严重,导致拉取变慢。
> > >> 可以尝试着扩kafka 分区,加大Source并发看下。
> > >> Best,
> > >> Hailong Wang
> > >>
> > >> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
> > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
> > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
> > >> >kafka消费没有积压,也没有反压,
> > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
> > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
> > >>
> >
>


Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-30 文章 tison
那有审计/监控的话看下每个节点的 in/out 记录呗,总能看到是哪一步跌了...

照你现在提供的信息听起来一切正常那就是业务逻辑本身输出少了,不然总得有哪里不一样。如果只有 sink 跌了,那就是 sink
有问题,比如可能依赖了外部环境或者内部积累错误等等。

Best,
tison.


Yang Peng  于2020年9月30日周三 下午5:26写道:

> 感谢回复,是的,之前确实怀疑是业务逻辑导致的
> 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题
>
> tison  于2020年9月30日周三 下午2:33写道:
>
> > Hi Yang,
> >
> > 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么?
> >
> > 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。
> >
> > Best,
> > tison.
> >
> >
> > Yang Peng  于2020年9月30日周三 上午10:29写道:
> >
> > > 感谢回复,我们看了consumer的lag很小
> > > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
> > > 而且任务重启了没法jstack判断了
> > >
> > > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:
> > >
> > > >
> > > >
> > > >
> > > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
> > > > 也可以 jstack 采下堆栈看下,GC等看下。
> > > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
> > > > Best,
> > > > Hailong Wang
> > > > 在 2020-09-29 20:06:50,"Yang Peng"  写道:
> > > >
> > > >
> > >
> >
> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
> > > > >flinkkafkaconsumer消费的并行度也是90
> 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
> > > > >
> > > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
> > > > >
> > > > >>
> > > > >>
> > > > >>
> > > > >> Hi Yang Peng:
> > > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> > > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
> > > > >> 2. Source 的序列化耗时严重,导致拉取变慢。
> > > > >> 可以尝试着扩kafka 分区,加大Source并发看下。
> > > > >> Best,
> > > > >> Hailong Wang
> > > > >>
> > > > >> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
> > > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
> > > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
> > > > >> >kafka消费没有积压,也没有反压,
> > > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
> > > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
> > > > >>
> > > >
> > >
> >
>


Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-30 文章 tison
故障点的意思是从开始跌的地方重新消费吗?如果是这样那就是有问题,可以看看之前输出变少是正确数据输出慢了还是有些没输出了,慢了就得看看当时的环境,应该还是会有什么网络或者负载有波动的,没有可能就要怀疑监控系统有问题了;少输出了就是错了,可能是依赖的外部环境不稳定等等。

Best,
tison.


tison  于2020年9月30日周三 下午5:33写道:

> 那有审计/监控的话看下每个节点的 in/out 记录呗,总能看到是哪一步跌了...
>
> 照你现在提供的信息听起来一切正常那就是业务逻辑本身输出少了,不然总得有哪里不一样。如果只有 sink 跌了,那就是 sink
> 有问题,比如可能依赖了外部环境或者内部积累错误等等。
>
> Best,
> tison.
>
>
> Yang Peng  于2020年9月30日周三 下午5:26写道:
>
>> 感谢回复,是的,之前确实怀疑是业务逻辑导致的
>> 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题
>>
>> tison  于2020年9月30日周三 下午2:33写道:
>>
>> > Hi Yang,
>> >
>> > 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么?
>> >
>> > 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。
>> >
>> > Best,
>> > tison.
>> >
>> >
>> > Yang Peng  于2020年9月30日周三 上午10:29写道:
>> >
>> > > 感谢回复,我们看了consumer的lag很小
>> > > 而且监控显示数据流入量也没明显变化但是感觉这部分数据只是offset被更新了但是数据没有消费到,这个问题之前没有遇到过 这是突发发现的
>> > > 而且任务重启了没法jstack判断了
>> > >
>> > > hailongwang <18868816...@163.com> 于2020年9月29日周二 下午10:35写道:
>> > >
>> > > >
>> > > >
>> > > >
>> > > > 不过也比较奇怪,Source 数据的 format的话,应该不会使得 CPU 降低,这期间 Iowait 高吗
>> > > > 也可以 jstack 采下堆栈看下,GC等看下。
>> > > > 至于 Source format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
>> > > > Best,
>> > > > Hailong Wang
>> > > > 在 2020-09-29 20:06:50,"Yang Peng"  写道:
>> > > >
>> > > >
>> > >
>> >
>> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
>> > > > >flinkkafkaconsumer消费的并行度也是90
>> 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
>> > > > >
>> > > > >hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
>> > > > >
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >> Hi Yang Peng:
>> > > > >> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
>> > > > >> 1. Kafka 集群和Flink 集群之间的带宽被其它打满了。
>> > > > >> 2. Source 的序列化耗时严重,导致拉取变慢。
>> > > > >> 可以尝试着扩kafka 分区,加大Source并发看下。
>> > > > >> Best,
>> > > > >> Hailong Wang
>> > > > >>
>> > > > >> 在 2020-09-29 19:44:44,"Yang Peng"  写道:
>> > > > >> >请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
>> > > > >> >kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
>> > > > >> >kafka消费没有积压,也没有反压,
>> > > > 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
>> > > > >> >tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
>> > > > >>
>> > > >
>> > >
>> >
>>
>


Re: 请教二阶段提交问题

2020-09-26 文章 tison
> 可是再次提交没有意义啊,没有数据[捂脸哭]

这个事儿是这样的,你用 checkpoint 之后呢没有反过来确认的 commit 会留在 state 里,所以重启的时候重新加载 state
的时候就会再提交一遍。然后向 kafka 这一类存储 commit offset 是幂等的,发现已经 commit 过就跳过就 OK 了。

Best,
tison.


蒋佳成(Jiacheng Jiang) <920334...@qq.com> 于2020年9月26日周六 下午4:01写道:

>
> 两阶段提交的第一阶段提交中,事务参与者反馈ok后需要作出之后一定能提交事务的承诺,事务参与者需要做些事来兑现承诺比如将事务操作持久化。在FlinkKafkaProducer中,preCommit就是调用了KafkaProducer的flush将数据刷到kafka中,在整个checkpoint完成后再提交事务,如果提交失败,会在job重启时再次提交事务。因此,我们需要保证的是preCommit成功后commit一定要能成功,这个需要根据具体写入的存储提供的特性来完成。
>
>
>
>
> -- 原始邮件 --
> 发件人: "高亮" 发送时间: 2020年9月25日(星期五) 中午11:14
> 收件人: "user-zh" 主题: 请教二阶段提交问题
>
>
>
> 各位大佬,请教一下二阶段提交的问题,preCommit预提交失败回滚已经很清楚了,就是在commit阶段提交如果失败会怎么,比较迷惑。
>
>
>
> 我自己测试了一下,发现只要是commit失败会造成数据丢失,但是看了下方法注释,说是失败了后会重启flink恢复到最近的state,继续提交,可是我在程序里有专门打印source输入的流数据,发现没有按到任何数据进入,也就是说flink重启后就直接调用commit再次提交。
>
>
> 可是再次提交没有意义啊,没有数据[捂脸哭]
>
>
> 所以请教大佬,当commit出现异常后,flink内部是如何解决的,作为flink应用者,如何正确使用避免和解决这类问题!


Re: 编译Flink时找不到scala-maven-plugin:3.1.4

2020-09-23 文章 tison
从日志看你的 scala 是 2.10 版本的,比较新版本的 flink 应该都只支持 2.11 和 2.12

Best,
tison.


Natasha <13631230...@163.com> 于2020年9月23日周三 下午4:00写道:

> Hi All,
> 很高兴加入Flink这个大家庭!但是有个问题困扰了我好久!
> 当我导入Flink到IDEA中准备进行编译,输入“mvn clean install -Drat.skip=true
> -Dmaven.test.skip=true -Dmaven.javadoc.skip=true -Dcheckstyle.skip=true”后,
> 报错“Failed to execute goal
> net.alchim31.maven:scala-maven-plugin:3.1.4:testCompile
> (scala-test-compile) on project flink-runtime_2.10”,
>
> 我尝试了网上的方法,修改pom.xml文件中scala-maven-plugin的版本,或者是让IDEA的Scala版本与Windows的Scala版本保持一致,但是都不起作用!
>
>
>
>
> Best,
> Natasha
>
>
> | |
> Natasha
> |
> |
> |
> 签名由网易邮箱大师定制


Re: 向flink push代码

2020-05-27 文章 tison
Flink 的特点就是快(x)

Best,
tison.


宇张  于2020年5月28日周四 上午10:56写道:

> 感谢大佬们,我看到  Leonard Xu大佬已经关注了FLINK-17991
> <https://issues.apache.org/jira/browse/FLINK-17991>这个,好快的响应速度
>
> On Thu, May 28, 2020 at 10:25 AM Leonard Xu  wrote:
>
> > Hi,
> > Yangze 贴了官方教程,也可以看下 Jark 的博客[1],中文的看起来会快一些。
> >
> > Best,
> > Leonard Xu
> > [1] https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/ <
> > https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/>
> >
> > > 在 2020年5月28日,10:18,Yangze Guo  写道:
> > >
> > > 您好,社区的贡献代码教程[1]。
> > >
> > > Tips: 如果不是hotfix,请提交前确认已经在ticket下达成一致并且有committer将其assign给您。
> > >
> > > [1] https://flink.apache.org/zh/contributing/contribute-code.html
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Thu, May 28, 2020 at 10:15 AM 宇张  wrote:
> > >>
> > >> 找打了教程了
> > >>
> > >>
> > >> On Thu, May 28, 2020 at 10:10 AM 宇张  wrote:
> > >>
> > >>> hi,
> > >>> 请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。
> > >>>
> >
> >
>


Re: RichMapFunction的问题

2020-05-24 文章 tison
关于第一个问题,最好细化一下【各种问题】是什么问题。

关于第二个问题,我印象中目前 Flink 不支持按并发(SubTask)级别指定调度的位置,绕过方案可以是设置每个 TM 仅持有一个
Slot。这方面我抄送 Xintong,或许他的工作能帮到你。

Best,
tison.


xue...@outlook.com  于2020年5月25日周一 上午11:29写道:

> 遇到两个问题:
>   背景:flink v1.10集群,几十台主机,每台CPU 16,内存 50G,整个job的并发是200
>   比如我的一个RichMapFunction在open中会加载存量数据。
>   因维度数据和主数据是非常离散的,因此这些维度数据都需要加载到内存
>
> 1、这个RichMapFunction的并发度上不去,只能整到4,并发度上去后各种问题,但从主机内存以及分配给taskmanager的内存足够;
>
>
> 2、这个RichMapFunction的所有slot都分配到同一个taskmanager上,即同一个主机。没有找到接口可以分散到不同的taskmanager上;
>
> 说简单点:
>
> 1、 对于RichMapFunction的open中需要加载大量维度数据,并发度上不去受什么影响;
>
> 2、 对于一个算子如何干预使其分散到不同的taskmanager上;
>
>
>
>
> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>


Re: 关于水位线Watermark的理解

2020-05-26 文章 tison
最近刚好看到张俊老师的 Flink 分享 [1],这个里面对你想了解的部分介绍得很详细,可以结合阅读(x)

Best,
tison.

[1] https://files.alicdn.com/tpsservice/73a1f1c404d2a658585cf4f4d86ef776.pdf


smq <374060...@qq.com> 于2020年5月24日周日 下午10:25写道:

> 恩恩,我是刚接触flink不久,所以很多地方没有很清楚,谢谢指点
>
>
> ---原始邮件---
> 发件人: tison 发送时间: 2020年5月24日(周日) 晚上10:10
> 收件人: user-zh 主题: Re: 关于水位线Watermark的理解
>
>
> 整体没啥问题,但是我看你说【假如第一个数据的事件时间刚好为12:00的,那么此时水位线应该在11:59】,这个 Watermark 跟
> allowedLateness 没啥关系哈,是独立的逻辑。
>
> 文档层面你可以看看[1],源码你可以看看[2]里面检索 allowedLateness
>
> Best,
> tison.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#allowed-lateness
> [2]
>
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
>
>
>
> Benchao Li 
>  Hi,
>  你理解的是正确的,进入哪个窗口完全看事件时间,窗口什么时候trigger,是看watermark。
> 
>  smq <374060...@qq.com 于2020年5月24日周日 下午9:46写道:
> 
>  
>  
> 
> 使用时间时间窗口处理关于数据延迟,加入允许延迟时间为1min,窗口大小是10min,那么在12:00-12:10这个窗口中,如果事件时间是在12:09:50这个数据在12:10:50这个数据到达,并且此时水位线刚好在12:09:50,那么这个延迟数据可以被处理,这个可以理解。
>  
>  
> 
> 但是,假如第一个数据的事件时间刚好为12:00的,那么此时水位线应该在11:59,这个数据能进入12:00-12:10这个窗口被处理吗。按道理来说应该被正确处理。那么这样的话,进入窗口是按照事件时间,触发是按照水印时间。不知道这么理解对不对,这个问题想了很久。
> 
> 
> 
>  --
> 
>  Best,
>  Benchao Li
> 


Re: 全局state

2020-05-26 文章 tison
任意并行度全局状态从物理上就是不可行的,你可以了解一下分布式计算系统怎么部署物理作业的。“全局状态”要么依赖外部存储要么依赖实现(部署)细节。

你这个需求能不能自定义 KeyBy 细节(KeySelector)来实现?相关文档见
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/api_concepts.html#define-keys-using-key-selector-functions

Best,
tison.


star <3149768...@qq.com> 于2020年5月26日周二 下午6:42写道:

> 请问,有全局状态组件吗?我有一个需求需要对数据里的id和name做映射,也就是如果两条数据的id或者name相同则映射成一个值;现在只能使用operator
> state,并且并行度设置为1,来实现全局state
>
>
> 谢谢
>
> 发自我的iPhone


Re: Kafka Consumer反序列化错问题

2020-05-29 文章 tison
这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。

参考这个文档
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath

Best,
tison.


Even <452232...@qq.com> 于2020年5月29日周五 下午6:48写道:

> 谢谢,请问需要怎么处理避免这个问题?
>
>
>
>
> --原始邮件--
> 发件人:"zz zhang" 发送时间:2020年5月29日(星期五) 下午5:16
> 收件人:"user-zh" jkill...@dingtalk.com;
>
> 主题:Re: Kafka Consumer反序列化错问题
>
>
>
> 应该是maven-shade那边配置问题,
>
> 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.
> apache.flink.kafka.shaded.org
> .apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常
>
> 夏帅  
>  可以排除一下是否是jar包冲突
> 
> 
>  --
>  发件人:Even <452232...@qq.com
>  发送时间:2020年5月29日(星期五) 16:17
>  收件人:user-zh   主 题:Kafka Consumer反序列化错问题
> 
>  Hi!
>  请教一个Kafka Consumer反序列问题:
>  一个kafkanbsp;consumernbsp;job 提交到Flink session
> cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下:
>  其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data =
> env.addSource(new FlinkKafkaConsumer[String](topic, new
> SimpleStringSchema(), properties))
>  2020-05-27nbsp;17:05:22
>  org.apache.kafka.common.KafkaException: Failed to construct kafka
> consumer
>  at
> org.apache.kafka.clients.consumer.KafkaConsumer.  at
> org.apache.kafka.clients.consumer.KafkaConsumer.  at
> org.apache.kafka.clients.consumer.KafkaConsumer.  at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>  at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>  at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>  at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>  at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: org.apache.kafka.common.KafkaException:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> is not an instance of org.apache.kafka.common.serialization.Deserializer
>  at
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
>  at
> org.apache.kafka.clients.consumer.KafkaConsumer.  ... 15 more
>
>
>
> --
> Best,
> zz zhang


Re: Kafka Consumer反序列化错问题

2020-05-29 文章 tison
另外关于类加载的一般性文档,可以看下这个

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html

Best,
tison.


tison  于2020年5月29日周五 下午7:46写道:

> 这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。
>
> 参考这个文档
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath
>
> Best,
> tison.
>
>
> Even <452232...@qq.com> 于2020年5月29日周五 下午6:48写道:
>
>> 谢谢,请问需要怎么处理避免这个问题?
>>
>>
>>
>>
>> --原始邮件--
>> 发件人:"zz zhang"> 发送时间:2020年5月29日(星期五) 下午5:16
>> 收件人:"user-zh"> jkill...@dingtalk.com;
>>
>> 主题:Re: Kafka Consumer反序列化错问题
>>
>>
>>
>> 应该是maven-shade那边配置问题,
>>
>> 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.
>> apache.flink.kafka.shaded.org
>> .apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常
>>
>> 夏帅 > 
>>  可以排除一下是否是jar包冲突
>> 
>> 
>>  --
>>  发件人:Even <452232...@qq.com
>>  发送时间:2020年5月29日(星期五) 16:17
>>  收件人:user-zh >  主 题:Kafka Consumer反序列化错问题
>> 
>>  Hi!
>>  请教一个Kafka Consumer反序列问题:
>>  一个kafkanbsp;consumernbsp;job 提交到Flink session
>> cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下:
>>  其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data =
>> env.addSource(new FlinkKafkaConsumer[String](topic, new
>> SimpleStringSchema(), properties))
>>  2020-05-27nbsp;17:05:22
>>  org.apache.kafka.common.KafkaException: Failed to construct kafka
>> consumer
>>  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.>  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.>  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.>  at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>>  at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>  at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>>  at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>  at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>>  at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>  at java.lang.Thread.run(Thread.java:748)
>>  Caused by: org.apache.kafka.common.KafkaException:
>> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
>> is not an instance of org.apache.kafka.common.serialization.Deserializer
>>  at
>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
>>  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.>  ... 15 more
>>
>>
>>
>> --
>> Best,
>> zz zhang
>
>


Re: flink1.10 on yarn 问题

2020-05-29 文章 tison
然后你 execute 前后的代码片段甚至整个 main 如果可以的话通过 gist 贴一下(x)

Best,
tison.


tison  于2020年5月29日周五 下午2:21写道:

> 这个问题好诡异啊,一般来说编译会在 env.execute
> 的时候拦截,不应该真的调度起来才对。你能详细描述一下你提交作业的方法还有这个错误报在哪里吗(client?cluster?)?
>
> Best,
> tison.
>
>
> air23  于2020年5月29日周五 下午1:38写道:
>
>> cdh运行flink1.10 on cdh yarn 报错如下。 用1.7.2版本就没有问题
>> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar 也加了
>> hadoop环境变量 export HADOOP_CONF_DIR=/etc/hadoop/conf
>> 求解答
>>
>>
>>
>>
>>
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error:
>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>> (JobID: e358699c1be6be1472078771e1fd027f)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>>
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>>
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>> (JobID: e358699c1be6be1472078771e1fd027f)
>>
>> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>
>> at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>
>> at
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>>
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>
>> at
>> tt.WordCountStreamingByJava.main(WordCountStreamingByJava.java:36)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>
>> ... 11 more
>>
>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>> Job failed (JobID: e358699c1be6be1472078771e1fd027f)
>>
>> at
>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>>
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
>>
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>
>>

Re: Re: flink1.10 on yarn 问题

2020-05-29 文章 tison
你运行的命令是啥?然后在哪个目录下运行的,和 flink 下载下来解压的目录是什么相对关系?

Best,
tison.


air23  于2020年5月29日周五 下午2:35写道:

> 代码就是flink自带的例子。
>
> public class WordCountStreamingByJava {
> public static void main(String[] args) throws Exception {
>
> // 创建执行环境
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 设置socket数据源
> DataStreamSource source = env.socketTextStream("zongteng75", 9001,
> "\n");
>
> // 转化处理数据
> DataStream dataStream = source.flatMap(new
> FlatMapFunction() {
> @Override
> public void flatMap(String line, Collector collector)
> throws Exception {
>
> System.out.println(line);
> for (String word : line.split(" ")) {
> collector.collect(new WordWithCount(word, 1));
> }
> }
> }).keyBy("word")//以key分组统计
> .timeWindow(Time.seconds(2),Time.seconds(2))//设置一个窗口函数,模拟数据流动
> .sum("count");//计算时间窗口内的词语个数
>
> // 输出数据到目的端
> dataStream.print();
>
> // 执行任务操作
> env.execute("Flink Streaming Word Count By Java");
>
> }
>
>
>
>
> 我现在加了flink环境变量 这个例子 可以过了。就很奇怪
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-05-29 14:22:39,"tison"  写道:
> >然后你 execute 前后的代码片段甚至整个 main 如果可以的话通过 gist 贴一下(x)
> >
> >Best,
> >tison.
> >
> >
> >tison  于2020年5月29日周五 下午2:21写道:
> >
> >> 这个问题好诡异啊,一般来说编译会在 env.execute
> >> 的时候拦截,不应该真的调度起来才对。你能详细描述一下你提交作业的方法还有这个错误报在哪里吗(client?cluster?)?
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> air23  于2020年5月29日周五 下午1:38写道:
> >>
> >>> cdh运行flink1.10 on cdh yarn 报错如下。 用1.7.2版本就没有问题
> >>> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar 也加了
> >>> hadoop环境变量 export HADOOP_CONF_DIR=/etc/hadoop/conf
> >>> 求解答
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> org.apache.flink.client.program.ProgramInvocationException: The main
> >>> method caused an error:
> >>> org.apache.flink.client.program.ProgramInvocationException: Job failed
> >>> (JobID: e358699c1be6be1472078771e1fd027f)
> >>>
> >>> at
> >>>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >>>
> >>> at
> >>>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >>>
> >>> at
> >>>
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >>>
> >>> at
> >>>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
> >>>
> >>> at
> >>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> >>>
> >>> at
> >>>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
> >>>
> >>> at
> >>>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> >>>
> >>> at java.security.AccessController.doPrivileged(Native Method)
> >>>
> >>> at javax.security.auth.Subject.doAs(Subject.java:422)
> >>>
> >>> at
> >>>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >>>
> >>> at
> >>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> >>>
> >>> Caused by: java.util.concurrent.ExecutionException:
> >>> org.apache.flink.client.program.ProgramInvocationException: Job failed
> >>> (JobID: e358699c1be6be1472078771e1fd027f)
> >>>
> >>> at
> >>>
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> >>>
> >>> at
> >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> >>>
> >>> at
> >>>
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
> >>>
> >>> at
> >>>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.e

Re: flink 1.10webui不显示print内容

2020-05-26 文章 tison
你这个程序看起来不能通过 Web UI 提交。Flink 依赖内部异常在 Web UI 提交的路径里做编译。你这直接 Catch 了是拿不到作业图的。

你这个作业真的起来了吗?

具体提交的操作怎么样的,除了你要的 taskmanager.out 没有,有啥?

Best,
tison.


smq <374060...@qq.com> 于2020年5月27日周三 上午7:34写道:

> FlinkKafkaConsumer011 FlinkKafkaConsumer011<(topic, new SimpleStringSchema(), properties);
> consumer.setStartFromLatest();
> DataStreamSource SingleOutputStreamOperator stream.map(
> new MapFunction int num = 0;
>
> @Override
> public Tuple2 num++;
> if (num % 10 == 0) {
>
> System.out.println("出现错误,即将重启");
> throw new RuntimeException("出现错误,程序重启!");
> } else {
> return new Tuple2(s, 1);
> }
> }
> }).keyBy(0)
> .sum(1);
>
> sum.print();
> try {
> env.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> 这个是部分代码
>
>
> --原始邮件--
> 发件人:"Benchao Li" 发送时间:2020年5月26日(星期二) 晚上11:29
> 收件人:"user-zh"
> 主题:Re: flink 1.10webui不显示print内容
>
>
>
> 你可以给我们看一下你是怎么print的么?
>
> smq <374060...@qq.com 于2020年5月26日周二 下午11:20写道:
>
>  我这个在集群上提交或者webui提交都看不到输出内容,这应该不是client吧
> 
> 
>  ---原始邮件---
>  发件人: quot;Lijie Wangquot;  发送时间: 2020年5月26日(周二) 晚上10:14
>  收件人: quot;user-zh@flink.apache.orgquot;<
> user-zh@flink.apache.orggt;;
>  主题: 回复:flink 1.10webui不显示print内容
> 
> 
>  这个是不需要配置并且所有版本都支持的,你可以看一下 taskmanager.out 的输出内容。 此外,你需要确认一下你 print
>  的逻辑是否属于在 TM 端执行,有可能是在 client 端被执行的。
> 
> 
> 
> 
>  在2020年05月26日 21:39,smq<374060...@qq.comgt; 写道:
>  Hi
>  我的代码中打印的结果不能在webui上stdout看到,但是网上看的博客有人是可以显示打印内容的,只不过不是1.10版本。
>  请问是配置的问题还是这个版本不支持呢
>
>
>
> --
>
> Best,
> Benchao Li


Re: 订阅中文邮件列表

2020-06-02 文章 tison
请发送任意邮件到 user-zh-subscr...@flink.apache.org 订阅。

Best,
tison.


li wei  于2020年6月2日周二 下午7:36写道:

> 中文邮件列表
>


Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-12 文章 tison
那就要看下你是什么 Flink 版本,怎么提交到 YARN 上的,以及 YARN 的日志上的 classpath 是啥了

Best,
tison.


王松  于2020年7月13日周一 下午12:54写道:

> 各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
> 请问是什么原因导致的呢?
>
> 代码如下:
>
>
> -
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env,
> settings);
>
> tenv.executeSql("CREATE TABLE test_table (\n" +
> " id INT,\n" +
> " name STRING,\n" +
> " age INT,\n" +
> " create_at TIMESTAMP(3)\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'test_json',\n" +
> " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
> " 'properties.group.id' = 'testGroup',\n" +
> " 'format' = 'json',\n" +
> " 'scan.startup.mode' = 'latest-offset'\n" +
> ")");
> Table table = tenv.sqlQuery("select * from test_table");
> tenv.toRetractStream(table, Row.class).print();
> env.execute("flink 1.11.0 demo");
>
> -
>
> pom 文件如下:
> =
> 
> 2.11
> 1.11.0
> 
> 
> 
> org.apache.flink
>
> flink-table-planner-blink_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
>
> flink-table-runtime-blink_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
> 
> org.apache.flink
>
> flink-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
>
> flink-connector-kafka-0.11_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
>
> flink-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-core
> ${flink.version}
> 
> 
> org.apache.flink
> flink-clients_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-table-common
> ${flink.version}
> 
> 
> =
>


Re: Flink DataStream 统计UV问题

2020-07-09 文章 tison
你这个需求貌似是要看一天的 UV 的实时更新量,可以看一下 sliding window。如果是每天 0 点清零,实时看今天的
UV,那就是另一个问题了,应该需要自己定义 trigger & evictor

每条触发一次 window...看你数据量吧

Best,
tison.


shizk233  于2020年7月10日周五 上午10:23写道:

> Hi Jiazhi,
>
>
> 1.如果数据流量不是很大的话,按每条数据触发也没问题。另外,基于事件时间的情况,提前触发可以选择ContinuousEventTimeTrigger,可以查看Trigger接口的实现找到你想要的trigger。
> 2.窗口结束后会自动释放。一般对于Global窗口需要手动设置TTL
>
> Best,
> shizk233
>
> ゞ野蠻遊戲χ  于2020年7月7日周二 下午10:27写道:
>
> > 大家好!
> >
> >  想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题:
> > 1、在使用Tumbling窗口的时候,由于使用窗口跨度是1天(Time.days(1)),只有以一天结束的时候,才能输出一个uv值,
> > 这样时间等待太长了,所以加了一个trigger,每来一条都触发一次窗口,不知道这样的用法没有问题。
> > 2、还有想问下在窗口结束后,里面的state状态会自动释放吗?还是要自己手动设置TTL的。
> >
> >
> > 谢谢!
> > Jiazhi
> >
>


Re: flink 高可用问题

2020-06-22 文章 tison
你看一下你的 chk 间隔,看起来是作业还没调度起来就开始 chk 所以失败。可能原因资源不足,调度不起来或者调度得慢,你 chk 间隔又小,就这样了。

如果是一直 chk 以这个方式失败,应该看下调度的日志为啥迟迟调不起来

Best,
tison.


Yichao Yang <1048262...@qq.com> 于2020年6月22日周一 上午10:57写道:

> Hi
>
>
> 看日志应该只是INFO,而不是错误,你的job是做不了checkpoint吗?
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"Tony" 发送时间:2020年6月22日(星期一) 上午10:54
> 收件人:"user-zh"
> 主题:flink 高可用问题
>
>
>
> 你好。
>
>
> 我按着官方文档配置了flink的高可用(flink-conf.yaml)如下:
> high-availability:zookeeper
> high-availability.zookeeper.quorum:master:2181 ,slave1:2181,slave2:2181
> high-availability.zookeeper.path.root:/flink
> high-availability.cluster-id:/cluster_one
> highavailability.storageDir:hdfs://master:9000/flink/ha
>
>
> 我的flink和zookeeper都是在K8s的容器中
> job启动出现如下问题:麻烦帮忙看一下,谢谢。
> 2020-06-22 02:47:43,884 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Checkpoint triggering task Source:Kafka-Consumer - (Sink: Print to
> Std. Out, Filter -Query Map - Unwind - Custom Map - filter
> - Data Transformation - Filter) (1/1) of job
>  is not in state RUNNING but SCHEDULED
> instead. Aborting checkpoint.


Re: flink JobManager HA 异常 the fencing token is null

2020-06-09 文章 tison
你可以详细说一下场景,这个我想了一下应该是你选举窗口太长了

0. 某个时候,Dispatcher 选出了 Leader 并发布自己的地址
1. 某个组件向 Dispatcher 发了个消息,你这里前端点击之后后端 WebMonitor 给 Dispatcher 发
requestMultipleJobDetails
消息
2. Dispatcher 跟 zk 链接抖动,丢 leader 了。早期版本会把这个 fencing token 设置成 null
3. 1 里面的消息到达 Dispatcher,Dispatcher 走 fencing token 逻辑,看到是 null
4. 抛出此异常

如果稍后又选举成功,这里的异常应该是 fencing token mismatch 一类的

Best,
tison.


tison  于2020年6月9日周二 下午9:15写道:

> 啥 flink 版本啊?1.10 Dispatcher 魔改之后应该不会 null 的
>
> Best,
> tison.
>
>
> whirly  于2020年6月9日周二 下午8:58写道:
>
>> 大家好:
>> 环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?
>>
>>
>> 异常信息:
>> Internal server error.,
>> > side:\norg.apache.flink.runtime.rpc.exceptions.FencingTokenException:
>>  Fencing token not set: Ignoring message
>> LocalFencedMessage(93428232915e7c6e4947aaa5910341a8,
>> LocalRpcInvocation(requestMultipleJobDetails(Time)))
>> sent to akka.tcp://flink@cluster2-host1:40443/user/dispatcher because
>> the fencing token is null.
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> End of exception on server side>
>
>


Re: flink JobManager HA 异常 the fencing token is null

2020-06-09 文章 tison
啥 flink 版本啊?1.10 Dispatcher 魔改之后应该不会 null 的

Best,
tison.


whirly  于2020年6月9日周二 下午8:58写道:

> 大家好:
> 环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?
>
>
> 异常信息:
> Internal server error.,
>  side:\norg.apache.flink.runtime.rpc.exceptions.FencingTokenException:
>  Fencing token not set: Ignoring message
> LocalFencedMessage(93428232915e7c6e4947aaa5910341a8,
> LocalRpcInvocation(requestMultipleJobDetails(Time)))
> sent to akka.tcp://flink@cluster2-host1:40443/user/dispatcher because the
> fencing token is null.
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> End of exception on server side>


Re: flink JobManager HA 异常 the fencing token is null

2020-06-09 文章 tison
噢,那应该就是上面说的问题了

你的 Dispatcher 能被发现说明一开始选主和发布是 ok 的,你可以贴一下 HA
的配置,看看有没特别不靠谱的,然后去日志里找一下丢 leadership 的日志,一般来说前后会有一堆 zk 链接 ConnectionLoss 或者
SessionExpire 的日志

Best,
tison.


whirly  于2020年6月9日周二 下午9:23写道:

> Flink 1.8
>
>
>
>
> | |
> whirly
> |
> |
> 邮箱:whir...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年06月09日 21:15,tison 写道:
> 啥 flink 版本啊?1.10 Dispatcher 魔改之后应该不会 null 的
>
> Best,
> tison.
>
>
> whirly  于2020年6月9日周二 下午8:58写道:
>
> > 大家好:
> > 环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?
> >
> >
> > 异常信息:
> > Internal server error.,
> >  > side:\norg.apache.flink.runtime.rpc.exceptions.FencingTokenException:
> >  Fencing token not set: Ignoring message
> > LocalFencedMessage(93428232915e7c6e4947aaa5910341a8,
> > LocalRpcInvocation(requestMultipleJobDetails(Time)))
> > sent to akka.tcp://flink@cluster2-host1:40443/user/dispatcher because
> the
> > fencing token is null.
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> > at
> >
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> > at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > End of exception on server side>
>


Re: flink精准一次性消费问题

2020-06-11 文章 tison
>checkpoint的配置有什么要求吗?

配成 EXACTLY_ONCE

>还有就是kafka的事务提交多久能提交一次,可配置吗?

chk 的时候提交,这里面深究的话有点并发问题,可以看 TwoPhaseCommitSink 的细节
配置这个事儿...有能力自定义,但是为啥要这么做呢呢

Best,
tison.


蒋佳成(Jiacheng Jiang) <920334...@qq.com> 于2020年6月11日周四 下午4:59写道:

> checkpoint完成通知里提交的事务
>
>
>
>
> --原始邮件--
> 发件人: "胡云川" 发送时间: 2020年6月11日(星期四) 下午4:56
> 收件人: "user-zh" 主题: 回复:flink精准一次性消费问题
>
>
>
> gt;Hi
> gt;这些问题都已经排查过了,
> gt;有一个问题,在做exctly-once的时候,
> gt;checkpoint的配置有什么要求吗?
> gt;还有就是kafka的事务提交多久能提交一次,可配置吗?
> gt;望解答,谢谢各位!
>
>
>
>
>
>
> --nbsp;原始邮件nbsp;--
> 发件人:nbsp;"Matt Wang" 发送时间:nbsp;2020年6月10日(星期三) 晚上7:39
> 收件人:nbsp;"user-zh@flink.apache.org"
> 主题:nbsp;Re:flink精准一次性消费问题
>
>
>
> kafka 从 0.11.0 开始支持事务写,在 flink 中如果开启了 EXACTLY-ONCE,数据会先 send 到 kafka,但在未调用
> commit 之前,这部分数据是数据是属于未完成事务的数据,站在 kafka
> 的角度,数据还是会存储下来的,只不过下游在消费的时候,根据nbsp; isolation.level 设置来决定是否能消费到未 commit
> 的数据。
>
>
> ---
> Best,
> Matt Wang
>
>
> On 06/10/2020 14:28,Yichao Yang<1048262...@qq.comgt; wrote:
> Hi
>
>
> sinkamp;nbsp;
> 为kafka时,需要kafka版本大于1.0,并且kafka端也要开启两阶段提交功能才能满足EXACTLY-ONCE。可以检查下你的配置是否都满足。
>
>
> Best,
> Yichao Yang
>
>
>
>
> --amp;nbsp;原始邮件amp;nbsp;--
> 发件人:amp;nbsp;"胡云川" 发送时间:amp;nbsp;2020年6月10日(星期三) 下午2:25
> 收件人:amp;nbsp;"user-zh"
> 主题:amp;nbsp;flink精准一次性消费问题
>
>
>
> amp;amp;gt;Hi,
> amp;amp;gt;在使用flink往kafka写入数据时,使用了EXACTLY-ONCE,但是
>
> amp;amp;gt;在debug测试的时候,发现数据在invoke方法里的traction.producer.send()的时候数据就已经传过去了,没有通过precommit和commit方法
> amp;amp;gt;请问大家可以解释一下吗?谢谢!


Re: flink 编译

2021-01-12 文章 tison
试试 mvn clean install -DskipTests -pl flink-runtime,flink-dist

Best,
tison.


penguin.  于2021年1月12日周二 下午9:44写道:

> Hi,
>
>
> 请问有人知道怎么单独编译flink-runtime模块吗?
> 然后这样是否能把更改的部分直接在flink-dist包中的org.apache.flink.runtime目录下进行替换?
> 整体编译一次实在太慢了。
> 谢谢!
>
>
> penguin


Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 tison
PerJob 模式下,在作业完全挂掉之后,是会上报 YARN RM 应用自己失败的状态的。

当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。

你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?

Best,
tison.


zhisheng  于2020年11月12日周四 下午8:17写道:

> 同遇见过这个问题,所以作业监控告警的时候一般都是拿作业所有的 task 级别的状态,而不是简单的 yarn 状态
>
> hdxg1101300123  于2020年11月12日周四 下午8:07写道:
>
> > 可以设置检查点失败任务也失败
> >
> >
> >
> > 发自vivo智能手机
> > > hi everyone,
> > >
> > > 最近在使用Flink-1.11.1 On Yarn Per
> > Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn
> > application仍处于运行状态
> > >
> > > 疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢
> > >
> > > best,
> > > amenhub
>


Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 tison
detached 是另一个坑,因为你 attached 的时候需要等 client 去 request status 才会触发状态变化,但是普通的
execute 应该也是会自动的去拉结果的。

可以看下下列关键日志的打印情况

- log.info("Job {} reached globally terminal state {}.", ...)
- LOG.debug("Shutting down cluster because someone retrieved the job
result.");
- LOG.info("Shutting {} down with application status {}. Diagnostics {}.",
...)

Best,
tison.


JasonLee <17610775...@163.com> 于2020年11月13日周五 上午11:22写道:

> hi
> 1,首先确定你提交的是per-job模式吗?
> 2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了?
>
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink算子类在多个subtask中是各自初始化1个实例对象吗?

2020-11-15 文章 tison
可以这么认为,大体上你可以认为每个并发有自己的环境。

技术上,算子对象是每个并发会实例化一个,而 static 变量的【共享】程度跟你设置的 slot per TM
值还有其他一些调度相关的参数有关,但是最好不要依赖这种实现层面的东西。

一种常见的误解是我创建一个 static HashMap 就神奇地拥有了全局的键值存储,这当然是不对的,只有在同一个 JVM 实例上也就是同一个 TM
上的任务才会看到同一个 HashMap 对象,而这几乎是不可控的。

可以看一下这篇文档[1]对物理部署的实际情况有一个基本的认知。

Best,
tison.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html


hl9...@126.com  于2020年11月16日周一 下午1:55写道:

> Hi,all:
>
> flink算子在多个并行度的job中,每个算子(假如1个算子都有一个对应的java类)在多个subtask中会共享1个java类实例吗?还是每个subtask都会各自的实例?
>
> 我做了一个简单测试,写了一个flatmap类,在构造方法和open方法中打印类的toString方法,发现输出都不同,是否可以证明每个subtask都初始化了自己的类实例?
> 希望有朋友能解释下算子在job运行中初始化的过程。
>
> 测试相关代码如下:
> // flink 1.10.2版本,并行度为3
> @Slf4j
> public class PersonFlatMap extends RichFlatMapFunction String>, Person> {
> private transient ValueState state;
>
> public PersonFlatMap(){
> log.info(String.format("PersonFlatMap【%s】:
> 创建实例",this.toString()));
> }
>
> @Override
> public void open(Configuration parameters) throws IOException {
> //略去无关代码...
> log.info(String.format("PersonFlatMap【%s】:初始化状态!",
> this.toString()));
> }
>
> @Override
> public void flatMap(Tuple2 t, Collector
> collector) throws Exception {
> Person p = JSONUtil.toObject(t.f1,Person.class);
> collector.collect(p);
> if(state.value() == null){state.update(0);}
> state.update(state.value() + 1);
> log.info("state: "+state.value());
> }
> }
>
> //测试日志输出
> ...
> flink-10.2 - [2020-11-16 13:41:54.360] - INFO  [main]
> com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@ba8d91c】: 创建实例
> //此处略去无关日志...
> flink-10.2 - [2020-11-16 13:42:00.326] - INFO  [Flat Map -> Sink: Print to
> Std. Out (1/3)] org.apache.flink.runtime.state.heap.HeapKeyedStateBackend
> - Initializing heap keyed state backend with stream factory.
> flink-10.2 - [2020-11-16 13:42:00.351] - INFO  [Flat Map -> Sink: Print to
> Std. Out (1/3)] com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c9d895d】:初始化状态!
> flink-10.2 - [2020-11-16 13:42:00.354] - INFO  [Flat Map -> Sink: Print to
> Std. Out (3/3)] com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c489c40】:初始化状态!
> flink-10.2 - [2020-11-16 13:42:00.356] - INFO  [Flat Map -> Sink: Print to
> Std. Out (2/3)] com.toonyoo.operator.PersonFlatMap  -
> PersonFlatMap【com.toonyoo.operator.PersonFlatMap@1cd4f5c9】:初始化状态!
> ...
>
>
>
>
> hl9...@126.com
>


  1   2   >