Re: flink 1.11 sql作业提交JM报错

2020-07-11 文章 Leonard Xu
HI, fulin 能大致贴下代码吗?能复现异常即可。简单说下这两个方法, TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是 DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …) 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job

????: ?????????? richfunction????????????????????????????????????????????

2020-07-11 文章 hdxg1101300...@163.com
???y???l?Fflink?? ?? ??1??map??2?? ??sink??1?? import

回复: 滑动窗口数据存储多份问题

2020-07-11 文章 Jimmy Zhang
Hi,all! 从WindowOperator.java的processElement方法跟进去,使用windowState.add(element.getValue());添加数据,这里面找到add方法的HeapListState类的实现, @Override public void add(V value) { Preconditions.checkNotNull(value, "You cannot add null to a ListState."); final N namespace = currentNamespace; final StateTable>

flink 1.11 es未定义pk的sink问题

2020-07-11 文章 sunfulin
hi, 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义: 不确定是我配置使用的方式不对,还是确实存在bug。。 CREATE TABLE ES6_SENSORDATA_OUTPUT ( event varchar, user_id

Re: 代码中如何取消正在运行的Flink Streaming作业

2020-07-11 文章 Jeff Zhang
Zeppelin 能够帮你提交和cancel job,就是通过上面jianxu说的ClusterClient api来做到的,对zeppelin感兴趣的话,可以参考这个视频 https://www.bilibili.com/video/BV1Te411W73b?p=21 jianxu 于2020年7月11日周六 下午4:52写道: > Hi: > > 我想,你可能打算通过API的方式来取消正在运行的流任务。Flink任务提交时需要构建ClusterClient,提交成功后会返回任务对应的JobId。任务取消时,通过调用ClusterClient的cancel(JobID >

Re:flink 1.11 sql作业提交JM报错

2020-07-11 文章 sunfulin
hi, 在JM日志中还有如下异常:这个也比较诡异。求大神帮忙解答下。 java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)

flink 1.11 sql作业提交JM报错

2020-07-11 文章 sunfulin
hi, 我使用flink 1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet add多个dml语句,并执行execute。 如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute? Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than

Re: 滑动窗口数据存储多份问题

2020-07-11 文章 Congxian Qiu
Hi 你说的 HeapListState 的困惑具体是什么呢? Best, Congxian Jimmy Zhang <13669299...@163.com> 于2020年7月11日周六 下午4:50写道: > 嗯嗯,之前没有选择回复全部,不好意思。 > > 我看源码关于RocksDB这块确实是需要序列化的,所以肯定是多份保存,如果状态后端是heap呢,也是一样的吗?从我测试内存来看,感觉也是多份,只是heapliststate那个类给了我一些困惑 > > > 在2020年07月11日 16:23,Congxian Qiu 写道: > Hi > > >

回复: 代码中如何取消正在运行的Flink Streaming作业

2020-07-11 文章 jianxu
Hi: 我想,你可能打算通过API的方式来取消正在运行的流任务。Flink任务提交时需要构建ClusterClient,提交成功后会返回任务对应的JobId。任务取消时,通过调用ClusterClient的cancel(JobID jobId)取消流任务。 Flink源码可以看看 CliFrontend[1]中的逻辑,如果觉得比较麻烦可以参考https://github.com/todd5167/flink-spark-submiter项目的任务提交部分,取消任务时构建ClusterClient即可。 | | jianxu | |

回复:滑动窗口数据存储多份问题

2020-07-11 文章 Jimmy Zhang
嗯嗯,之前没有选择回复全部,不好意思。 我看源码关于RocksDB这块确实是需要序列化的,所以肯定是多份保存,如果状态后端是heap呢,也是一样的吗?从我测试内存来看,感觉也是多份,只是heapliststate那个类给了我一些困惑 在2020年07月11日 16:23,Congxian Qiu 写道: Hi 每个窗口都是一个单独的 state,至于你认为的不同 state 仅保持引用是不对的。这个你可以使用 RocksDBStateBackend 来考虑,RocksDBStateBackend 中会把 state 序列化成 bytes,然后写到 RocksDB 中,就是每个

Re: 滑动窗口数据存储多份问题

2020-07-11 文章 Congxian Qiu
Hi 每个窗口都是一个单独的 state,至于你认为的不同 state 仅保持引用是不对的。这个你可以使用 RocksDBStateBackend 来考虑,RocksDBStateBackend 中会把 state 序列化成 bytes,然后写到 RocksDB 中,就是每个 State 中都会有一份。 PS:回复邮件的时候可以选择「全部回复」这样就能够加上 "user-zh@flink.apache.org"),这样我们的邮件所有人都能看到了 Best, Congxian 张浩 于2020年7月7日周二 上午10:34写道: > >

Re: 代码中如何取消正在运行的Flink Streaming作业

2020-07-11 文章 Congxian Qiu
Hi 如果你是想做一个作业管理的平台,可以尝试看一下 CliFrontend[1] 中相关的逻辑,对于 On Yarn 的作业,简单地说你需要能够正确的初始化一个 client 和 Yarn RM 交互,然后你需要知道 applicationId,另外你还需要知道 flink 的 JobId,接下来就是调用 Flink 的接口了 如果像更多的了解参数如从和命令行传到 java 代码的,你可以自己写一个单元测试,单步调试一下整个流程。 [1]

Re: 作业从flink1.9.0迁移到1.10.1,LogicalTypeRoot变更后无法从CP恢复:No enum constant org.apache.flink.table.types.logical.LogicalTypeRoot.ANY

2020-07-11 文章 Congxian Qiu
Hi 从错误栈来看,是因为 `No enum constant org.apache.flink.table.types.logical.LogicalTypeRoot.ANY ` 这个导致的无法正常 restore。首先你需要看下是否是大版本的升级(像 Jark 说的那样),如果是小版本的升级,你需要看下为什么找不到这个 LocigcalTypeRoot.ANY. PS: 贴代码/错误栈可以使用 gist[1] 或者 pastebin[2] 这样的服务,现在邮件里看到的栈信息没有很好的分行 [1] https://gist.github.com/ [2]

Re: flink 1.11 local execution oom问题

2020-07-11 文章 Congxian Qiu
Hi 这个问题可以看下是否和 releasenote[1] 中 memory configuration 相关的修改有关,具体到这个错误,你可以按照提示增加一些内存看看 [1] https://flink.apache.org/news/2020/07/06/release-1.11.0.html#other-improvements Best, Congxian sunfulin 于2020年7月10日周五 下午11:32写道: > hi, > >