HI, fulin
能大致贴下代码吗?能复现异常即可。简单说下这两个方法,
TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是
DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink
job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …)
的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job
???y???l?Fflink??
??
??1??map??2??
??sink??1??
import
Hi,all!
从WindowOperator.java的processElement方法跟进去,使用windowState.add(element.getValue());添加数据,这里面找到add方法的HeapListState类的实现,
@Override
public void add(V value) {
Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
final N namespace = currentNamespace;
final StateTable>
hi,
根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary
key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary
key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义:
不确定是我配置使用的方式不对,还是确实存在bug。。
CREATE TABLE ES6_SENSORDATA_OUTPUT (
event varchar,
user_id
Zeppelin 能够帮你提交和cancel job,就是通过上面jianxu说的ClusterClient
api来做到的,对zeppelin感兴趣的话,可以参考这个视频
https://www.bilibili.com/video/BV1Te411W73b?p=21
jianxu 于2020年7月11日周六 下午4:52写道:
> Hi:
>
> 我想,你可能打算通过API的方式来取消正在运行的流任务。Flink任务提交时需要构建ClusterClient,提交成功后会返回任务对应的JobId。任务取消时,通过调用ClusterClient的cancel(JobID
>
hi,
在JM日志中还有如下异常:这个也比较诡异。求大神帮忙解答下。
java.lang.IllegalStateException: No operators defined in streaming topology.
Cannot execute.
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)
hi,
我使用flink
1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet
add多个dml语句,并执行execute。
如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute?
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than
Hi
你说的 HeapListState 的困惑具体是什么呢?
Best,
Congxian
Jimmy Zhang <13669299...@163.com> 于2020年7月11日周六 下午4:50写道:
> 嗯嗯,之前没有选择回复全部,不好意思。
>
> 我看源码关于RocksDB这块确实是需要序列化的,所以肯定是多份保存,如果状态后端是heap呢,也是一样的吗?从我测试内存来看,感觉也是多份,只是heapliststate那个类给了我一些困惑
>
>
> 在2020年07月11日 16:23,Congxian Qiu 写道:
> Hi
>
>
>
Hi:
我想,你可能打算通过API的方式来取消正在运行的流任务。Flink任务提交时需要构建ClusterClient,提交成功后会返回任务对应的JobId。任务取消时,通过调用ClusterClient的cancel(JobID
jobId)取消流任务。
Flink源码可以看看
CliFrontend[1]中的逻辑,如果觉得比较麻烦可以参考https://github.com/todd5167/flink-spark-submiter项目的任务提交部分,取消任务时构建ClusterClient即可。
| |
jianxu
|
|
嗯嗯,之前没有选择回复全部,不好意思。
我看源码关于RocksDB这块确实是需要序列化的,所以肯定是多份保存,如果状态后端是heap呢,也是一样的吗?从我测试内存来看,感觉也是多份,只是heapliststate那个类给了我一些困惑
在2020年07月11日 16:23,Congxian Qiu 写道:
Hi
每个窗口都是一个单独的 state,至于你认为的不同 state 仅保持引用是不对的。这个你可以使用 RocksDBStateBackend
来考虑,RocksDBStateBackend 中会把 state 序列化成 bytes,然后写到 RocksDB 中,就是每个
Hi
每个窗口都是一个单独的 state,至于你认为的不同 state 仅保持引用是不对的。这个你可以使用 RocksDBStateBackend
来考虑,RocksDBStateBackend 中会把 state 序列化成 bytes,然后写到 RocksDB 中,就是每个 State
中都会有一份。
PS:回复邮件的时候可以选择「全部回复」这样就能够加上 "user-zh@flink.apache.org"),这样我们的邮件所有人都能看到了
Best,
Congxian
张浩 于2020年7月7日周二 上午10:34写道:
>
>
Hi
如果你是想做一个作业管理的平台,可以尝试看一下 CliFrontend[1] 中相关的逻辑,对于 On Yarn
的作业,简单地说你需要能够正确的初始化一个 client 和 Yarn RM 交互,然后你需要知道 applicationId,另外你还需要知道
flink 的 JobId,接下来就是调用 Flink 的接口了
如果像更多的了解参数如从和命令行传到 java 代码的,你可以自己写一个单元测试,单步调试一下整个流程。
[1]
Hi
从错误栈来看,是因为 `No enum constant
org.apache.flink.table.types.logical.LogicalTypeRoot.ANY ` 这个导致的无法正常
restore。首先你需要看下是否是大版本的升级(像 Jark 说的那样),如果是小版本的升级,你需要看下为什么找不到这个
LocigcalTypeRoot.ANY.
PS: 贴代码/错误栈可以使用 gist[1] 或者 pastebin[2] 这样的服务,现在邮件里看到的栈信息没有很好的分行
[1] https://gist.github.com/
[2]
Hi
这个问题可以看下是否和 releasenote[1] 中 memory configuration
相关的修改有关,具体到这个错误,你可以按照提示增加一些内存看看
[1]
https://flink.apache.org/news/2020/07/06/release-1.11.0.html#other-improvements
Best,
Congxian
sunfulin 于2020年7月10日周五 下午11:32写道:
> hi,
>
>
14 matches
Mail list logo