Re: flink基于源码如何编译打包生成flink-table-blink.jar

2020-09-28 文章 Xingbo Huang
Hi XiaChang

你可以在flink-table目录下执行打包命令。然后flink-table-uber-blink的target目录下生成的flink-table-uber-blink_2.11-1.12-SNAPSHOT.jar这个包就是你要的flink-table-blink_2.11-1.12-SNAPSHOT.jar

Best,
Xingbo

zilong xiao  于2020年9月29日周二 上午11:56写道:

> Hi XiaChang
>
>  
> 你可以对整个Flink工程打包,然后在flink-dist/target/flink-${version}-bin/flink-${version}/lib中找到,希望对你有帮助~
>
> 祝好
>
> XiaChang <13628620...@163.com> 于2020年9月29日周二 上午10:46写道:
>
> > 基于flink源码 如何编译打包生成flink-table-blink.jar
> >
> > 源码中,flink-table是多模块的,正常打包(mvn clean install -DskipTests
> > -Dfast)生成的是每个模块单独的jar,而不是flink-table-blink.jar
> >
> > 请问如何打包才能生成flink-table-blink.jar
> >
> >
> >
> >
> >
> >
>


Re: Re: flink使用在docker环境中部署出现的两个问题

2020-09-28 文章 cxydeve...@163.com
好的明白,谢谢大佬



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: checkpoint rocksdb hdfs 如何协调,保证数据不丢失

2020-09-28 文章 Congxian Qiu
Hi
   RocksDB 里面存的是 State,Flink 在做 checkpoint 的时候会把 State 备份到 HDFS 上,如果失败的话从
Checkpoint 进行恢复,如果想了解更详细的内容,可以参考文档[1][2] 以及文档里面的链接
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/checkpointing.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html

Best,
Congxian


Michael Ran  于2020年9月29日周二 上午11:06写道:

> dear all :
> 我们checkpoint 信息默认保存在rocksdb上,但是rocksdb
> 是一个单机系统,性能OK,要做到不丢失还是要备份到hdfs分布式文件系统上。
>
>
>问题是:
>1. 如果仅保存在rocksdb,那么单机物理损坏,数据是会丢失的。
>2. 如果仅保存hdfs,那么性能可能跟不上
>3.如果先保存到rocksdb,再定时备份到hdfs,那么是多久备份一次?中间出现物理损坏,还是会出现一端时间的丢失。
>4. 这块的详细设计,和具体流程、场景有合适的文档推荐吗?怎么再性能和数据完整性上做考虑的


回复: Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-28 文章 xiao cai
非常感谢建议,有zeeplin api的相关文档吗




 原始邮件 
发件人: chengyanan1...@foxmail.com
收件人: user-zh
发送时间: 2020年9月29日(周二) 09:54
主题: 回复: Re: 怎么样在Flink中使用java代码提交job到yarn


我们项目中也是用到了这个,我也是暂时采用的捕获日志来解析得到yarn application id 和 flink job id的 
后期重点研究一下zeeplin,或许可以修改一下源码来镶嵌到我们自己的系统中或者直接调用zeeplin的api 发件人: xushanshan 发送时间: 
2020-09-25 16:42 收件人: user-zh 主题: Re: 怎么样在Flink中使用java代码提交job到yarn 
可以捕获控制台打印出来的日志,flink相关日志的格式很固定,字符串截取就能获得 yarn application id 和 flink job id > 在 
2020年9月25日,下午4:23,xiao cai  写道: > > Hi all: > 大家好,我目前遇到一个flink 
任务提交方面的困扰: > 想要在自己的项目中(springboot)提交flink 
job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包
 > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢 > > > 
best, > xiao

Re: flink基于源码如何编译打包生成flink-table-blink.jar

2020-09-28 文章 zilong xiao
Hi XiaChang
 
你可以对整个Flink工程打包,然后在flink-dist/target/flink-${version}-bin/flink-${version}/lib中找到,希望对你有帮助~

祝好

XiaChang <13628620...@163.com> 于2020年9月29日周二 上午10:46写道:

> 基于flink源码 如何编译打包生成flink-table-blink.jar
>
> 源码中,flink-table是多模块的,正常打包(mvn clean install -DskipTests
> -Dfast)生成的是每个模块单独的jar,而不是flink-table-blink.jar
>
> 请问如何打包才能生成flink-table-blink.jar
>
>
>
>
>
>


Re: Re: flink使用在docker环境中部署出现的两个问题

2020-09-28 文章 Yang Wang
你可以自己写一个脚本,调用docker-entrypoint.sh,然后将输出重定向就可以
>/opt/flink/log/taskmanager.out 2>/opt/flink/log/taskmanager.err

docker run的时候通过--entrypoint设置为你新的脚本就好了


Best,
Yang

chenxuying  于2020年9月28日周一 下午3:37写道:

> 请问一下第一个问题您说的修改启动命令,有例子吗,或者什么文档可以参考
>
>
>
>
> 在 2020-09-28 12:50:25,"Yang Wang"  写道:
> >第一个问题,因为默认情况下JM/TM进程的STDOUT已经输出到console,所以是没有办公通过webui查看STDOUT输出的
> >可以通过docker logs来查看,当然你也可以修改启动命令,把STDOUT重定向到具体的文件
> >
>
> >第二个问题,在JobManager和TaskManager的docker-entrypoint.sh[1]里面会修改flink-conf.yaml的,
> >所以你mount进去会被修改掉
> >
> >[1].
> >https://github.com/apache/flink-docker/blob/dev-1.11/docker-entrypoint.sh
> >
> >
> >Best,
> >Yang
> >
> >chenxuying  于2020年9月27日周日 下午7:56写道:
> >
> >> 根据官网[1]使用docker部署flink,session cluster模式
> >> 环境win10+docker+flink1.11.2
> >> cmd命令
> >> docker run ^
> >> -d^
> >> --rm ^
> >> --name=jobmanager ^
> >> --hostname=jobmanager ^
> >> --network flink-network ^
> >> --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^
> >> -p 28081:8081 ^
> >> flink:1.11.2-scala_2.11 jobmanager
> >> docker run ^
> >> -d^
> >> --rm ^
> >> --name=taskmanager ^
> >> --hostname=taskmanager ^
> >> --network flink-network ^
> >> --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^
> >> flink:1.11.2-scala_2.11 taskmanager
> >>
> >>
> >> 问题一:
> >> 在webui查看任务输出stdout提示找不到输出文件
> >> java.util.concurrent.CompletionException:
> >> org.apache.flink.util.FlinkException: The file STDOUT does not exist on
> the
> >> TaskExecutor.
> >> at
> >>
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
> >> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> >> at
> >>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> >> ~[?:1.8.0_265]
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> ~[?:1.8.0_265]
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> ~[?:1.8.0_265]
> >> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
> >> Caused by: org.apache.flink.util.FlinkException: The file STDOUT does
> not
> >> exist on the TaskExecutor.
> >> ... 5 more
> >> 2020-09-27 09:04:33,370 ERROR
> >>
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler
> >> [] - Unhandled exception.
> >> org.apache.flink.util.FlinkException: The file STDOUT does not exist on
> >> the TaskExecutor.
> >> at
> >>
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
> >> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> >> at
> >>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> >> ~[?:1.8.0_265]
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> ~[?:1.8.0_265]
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> ~[?:1.8.0_265]
> >> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
> >>
> >>
> >>
> >>
> >> 问题二:
> >> mount的src的配置文件要多份吗
> >> 因为我把env替换成mount,如下
> >> docker run ^
> >> -d^
> >> --rm ^
> >> --name=jobmanager ^
> >> --network flink-network ^
> >> --mount
> type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf
> >> ^
> >> -p 28081:8081 ^
> >> flink:1.11.2-scala_2.11 jobmanager
> >>
> >>
> >> docker run ^
> >> -d^
> >> --rm ^
> >> --name=taskmanager ^
> >> --network flink-network ^
> >> --mount
> type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf
> >> ^
> >> flink:1.11.2-scala_2.11 taskmanager
> >>
> >>
> >> 结果发现webui上的可用Task Managers为0
> >>
> 每次执行命令的时候都会把mount配置的src下flink-conf.yaml中的jobmanager.rpc.address替换成了新的容器ip
> >>
> >>
> 我猜应该是这个原因导致启动taskmanager的时候jobmanager.rpc.address替换成了taskmanager的ip.所以没有Task可用
> >> 想问下大佬们,是我哪一步出现问题了吗
> >>
> >>
> >> [1]:
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/docker.html
>


checkpoint rocksdb hdfs 如何协调,保证数据不丢失

2020-09-28 文章 Michael Ran
dear all :
我们checkpoint 信息默认保存在rocksdb上,但是rocksdb 
是一个单机系统,性能OK,要做到不丢失还是要备份到hdfs分布式文件系统上。


   问题是:
   1. 如果仅保存在rocksdb,那么单机物理损坏,数据是会丢失的。
   2. 如果仅保存hdfs,那么性能可能跟不上
   3.如果先保存到rocksdb,再定时备份到hdfs,那么是多久备份一次?中间出现物理损坏,还是会出现一端时间的丢失。
   4. 这块的详细设计,和具体流程、场景有合适的文档推荐吗?怎么再性能和数据完整性上做考虑的

flink基于源码如何编译打包生成flink-table-blink.jar

2020-09-28 文章 XiaChang
基于flink源码 如何编译打包生成flink-table-blink.jar

源码中,flink-table是多模块的,正常打包(mvn clean install -DskipTests 
-Dfast)生成的是每个模块单独的jar,而不是flink-table-blink.jar   


请问如何打包才能生成flink-table-blink.jar



Re:Re: Re: 了解下大家生产中都用什么任务调度系统呢

2020-09-28 文章 Michael Ran
~。~ hera、海豚都行
在 2020-09-29 09:58:45,"chengyanan1...@foxmail.com"  
写道:
>
>Apache DolphinScheduler 你值得拥有
>
>https://dolphinscheduler.apache.org/zh-cn/
>
>
> 
>发件人: 赵一旦
>发送时间: 2020-09-28 20:47
>收件人: user-zh
>主题: Re: 了解下大家生产中都用什么任务调度系统呢
>感觉ooize成熟但不想用,xml写起来难受。
>azkaban也需要单独上传。
> 
>我现在用的公司的其实挺好,就是界面操作有点小bug,以及经常挂。就是全流程通过界面操作,创建任务,然后任务和任务之间拖一拖就可以创建依赖。
> 
>孟小鹏 <602012...@qq.com> 于2020年9月28日周一 下午8:22写道:
> 
>> Airflow  oozie
>>
>>
>>
>>
>>
>> 发自我的iPhone
>>
>>
>> -- 原始邮件 --
>> 发件人: 赵一旦 > 发送时间: 2020年9月28日 19:41
>> 收件人: user-zh > 主题: 回复:了解下大家生产中都用什么任务调度系统呢
>>
>>
>>
>> 主要是指开源的调度系统。
>>
>> 公司有个系统感觉经常挂,想换个开源的自己搭建。
>> (1)最好是支持WEB UI配置任务流的,不想搞个任务还需要单独写xml等文件然后打包上传那种。
>> (2)在生产中长期应用,稳定,能满足大多数需求的。
>>
>> 希望大家推荐下。


Re: Re: 了解下大家生产中都用什么任务调度系统呢

2020-09-28 文章 chengyanan1...@foxmail.com

Apache DolphinScheduler 你值得拥有

https://dolphinscheduler.apache.org/zh-cn/


 
发件人: 赵一旦
发送时间: 2020-09-28 20:47
收件人: user-zh
主题: Re: 了解下大家生产中都用什么任务调度系统呢
感觉ooize成熟但不想用,xml写起来难受。
azkaban也需要单独上传。
 
我现在用的公司的其实挺好,就是界面操作有点小bug,以及经常挂。就是全流程通过界面操作,创建任务,然后任务和任务之间拖一拖就可以创建依赖。
 
孟小鹏 <602012...@qq.com> 于2020年9月28日周一 下午8:22写道:
 
> Airflow  oozie
>
>
>
>
>
> 发自我的iPhone
>
>
> -- 原始邮件 --
> 发件人: 赵一旦  发送时间: 2020年9月28日 19:41
> 收件人: user-zh  主题: 回复:了解下大家生产中都用什么任务调度系统呢
>
>
>
> 主要是指开源的调度系统。
>
> 公司有个系统感觉经常挂,想换个开源的自己搭建。
> (1)最好是支持WEB UI配置任务流的,不想搞个任务还需要单独写xml等文件然后打包上传那种。
> (2)在生产中长期应用,稳定,能满足大多数需求的。
>
> 希望大家推荐下。


Re: FlinkSQL是否支持设置窗口trigger实现continuious trigger呢

2020-09-28 文章 赵一旦
good. 懂啦。

hailongwang <18868816...@163.com> 于2020年9月29日周二 上午12:16写道:

>
>
>
> 可以使用
> tableConfig.getConfiguration.set('table.exec.emit.early-fire.enabled',
> true)
> tableConfig.getConfiguration.set('table.exec.emit.early-fire.delay', '10
> s')
>
>
>  祝好
> Hailong Wang
> 在 2020-09-27 12:25:37,"赵一旦"  写道:
> >如题,不使用DatastreamAPI,使用FlinkSQL能否实现五分钟窗口,每10秒输出一次呢?
>


回复: Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-28 文章 chengyanan1...@foxmail.com
我们项目中也是用到了这个,我也是暂时采用的捕获日志来解析得到yarn application id 和 flink job id的
后期重点研究一下zeeplin,或许可以修改一下源码来镶嵌到我们自己的系统中或者直接调用zeeplin的api



 
发件人: xushanshan
发送时间: 2020-09-25 16:42
收件人: user-zh
主题: Re: 怎么样在Flink中使用java代码提交job到yarn
可以捕获控制台打印出来的日志,flink相关日志的格式很固定,字符串截取就能获得 yarn application id 和 flink job id
 
> 在 2020年9月25日,下午4:23,xiao cai  写道:
> 
> Hi all:
> 大家好,我目前遇到一个flink 任务提交方面的困扰:
> 想要在自己的项目中(springboot)提交flink 
> job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包
> 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。
> 非常感谢
> 
> 
> best,
> xiao
 


Re:flunk - checkpoint

2020-09-28 文章 hailongwang
Source 算子的Checkpoint 是由 CheckpointCoordinator 
周期性触发的(时间周期是自己设置的参数),然后下面的算子等到收到上游所有并发的barrier时,触发自己的Checkpoint(PS:1.11 目前 实现了 
unaligned Checkpoint,无需对齐)。当Master 收到所有算子的 Checkpoint 完成后,会 
notifyCheckpointComplete。
希望对你有帮助。
祝好。
Hailong Wang





在 2020-09-28 09:34:17,"郝文强" <18846086...@163.com> 写道:
>
>
>checkpoint 到底是在什么时间点触发的呢?
>我看过官方关于checkpoint的文档,但还是说不清 checkpoint 是什么时候触发的求大佬指点
>| |
>郝文强
>|
>|
>18846086...@163.com
>|
>签名由网易邮箱大师定制
>


Re:FlinkSQL是否支持设置窗口trigger实现continuious trigger呢

2020-09-28 文章 hailongwang



可以使用
tableConfig.getConfiguration.set('table.exec.emit.early-fire.enabled', true)
tableConfig.getConfiguration.set('table.exec.emit.early-fire.delay', '10 s')


 祝好
Hailong Wang
在 2020-09-27 12:25:37,"赵一旦"  写道:
>如题,不使用DatastreamAPI,使用FlinkSQL能否实现五分钟窗口,每10秒输出一次呢?


Re:当kafka有大量数据积压并且flink冷启动,flink端读取kafka有没有限流的参数?

2020-09-28 文章 hailongwang



如果使用 代码编写的话,可以调用 FlinkKafkaConsumer010#setRateLimiter,其中可以使用 
GuavaFlinkConnectorRateLimiter 类。
目前也有一个相关的issue:https://issues.apache.org/jira/browse/FLINK-17919
祝好
Hailong Wang
在 2020-09-27 16:22:00,"me"  写道:
>当kafka有大量数据积压并且flink冷启动,flink端读取kafka有没有限流的参数?
>kafka积压了2亿数据,flink启动后接着之前的状态读取,怎么控制flink的限流?


Re: flink sql 更新mysql字段

2020-09-28 文章 Leonard Xu
Hi

Insert 到指定字段是个通用的需求,社区已经有一个issue[1] 在跟踪了,你可以关注下


祝好
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-18726 
 

> 在 2020年9月28日,17:46,lemon  写道:
> 
> hi各位:
> 请问一下,如果mysql表中有20个字段,现在有多个insert into的语句分别更新指定字段,即同一条记录可能有多个insert语句去分别更新不同字段
> 现在遇到的问题是,因为在insert into语句中需要将mysql中所有字段都带上,所以更新会覆盖其他字段的值。
> 例如insert into mysql select a,b c from 
> kafka,但是我只要更新a,b字段,c字段想保持原来的值,请问这种情况需要怎么操作?
> flink1.10.1版本 blink




Re: 了解下大家生产中都用什么任务调度系统呢

2020-09-28 文章 赵一旦
感觉ooize成熟但不想用,xml写起来难受。
azkaban也需要单独上传。

我现在用的公司的其实挺好,就是界面操作有点小bug,以及经常挂。就是全流程通过界面操作,创建任务,然后任务和任务之间拖一拖就可以创建依赖。

孟小鹏 <602012...@qq.com> 于2020年9月28日周一 下午8:22写道:

> Airflow  oozie
>
>
>
>
>
> 发自我的iPhone
>
>
> -- 原始邮件 --
> 发件人: 赵一旦  发送时间: 2020年9月28日 19:41
> 收件人: user-zh  主题: 回复:了解下大家生产中都用什么任务调度系统呢
>
>
>
> 主要是指开源的调度系统。
>
> 公司有个系统感觉经常挂,想换个开源的自己搭建。
> (1)最好是支持WEB UI配置任务流的,不想搞个任务还需要单独写xml等文件然后打包上传那种。
> (2)在生产中长期应用,稳定,能满足大多数需求的。
>
> 希望大家推荐下。


回复:了解下大家生产中都用什么任务调度系统呢

2020-09-28 文章 孟小鹏
Airflow  oozie





发自我的iPhone


-- 原始邮件 --
发件人: 赵一旦 

了解下大家生产中都用什么任务调度系统呢

2020-09-28 文章 赵一旦
主要是指开源的调度系统。

公司有个系统感觉经常挂,想换个开源的自己搭建。
(1)最好是支持WEB UI配置任务流的,不想搞个任务还需要单独写xml等文件然后打包上传那种。
(2)在生产中长期应用,稳定,能满足大多数需求的。

希望大家推荐下。


Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

2020-09-28 文章 shizk233
flink sql似乎不能设置rebalance,在Data Stream API可以设。

一种思路是拆分sql逻辑,在Data Stream API上创建kafka source后再reblance成一张新表,再做后续dml sql。

另一种思路就是kafka topic增加一下分区

Asahi Lee <978466...@qq.com> 于2020年9月28日周一 下午1:56写道:

> 你好! 使用flink
> SQL,如何设置rebalance呢?--原始邮件--
> 发件人:zilongnbsp;xiao 发送时间:2020年9月27日(星期天) 晚上6:27
> 收件人:user-zh 主题:Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出


Re: 当kafka有大量数据积压并且flink冷启动,flink端读取kafka有没有限流的参数?

2020-09-28 文章 shizk233
应该是没有的,可以自己改造下kafka source来实现。
话说flink自动反压流控不能满足场景需要吗

me  于2020年9月27日周日 下午5:45写道:

> flink版本1.11
> flink连接kafka使用的是 flink  addSource特性
>
>
>  原始邮件
> 发件人: me
> 收件人: user-zh
> 发送时间: 2020年9月27日(周日) 17:22
> 主题: 当kafka有大量数据积压并且flink冷启动,flink端读取kafka有没有限流的参数?
>
>
> 当kafka有大量数据积压并且flink冷启动,flink端读取kafka有没有限流的参数?
> kafka积压了2亿数据,flink启动后接着之前的状态读取,怎么控制flink的限流?


回复: Re: sql-cli执行sql报错

2020-09-28 文章 hl9...@126.com
按照您的方法重试了下,又报了另一个错误:
Flink SQL> CREATE TABLE tx ( 
> account_id  BIGINT, 
> amount  BIGINT, 
> transaction_time TIMESTAMP(3), 
> WATERMARK FOR transaction_time AS transaction_time - 
> INTERVAL '5' SECOND 
> ) WITH ( 
> 'connector.type' = 'kafka', 
> 'connector.version' = 'universal',
> 'connector.topic' = 'heli01', 
> 'connector.properties.group.id' = 'heli-test',
> 'connector.properties.bootstrap.servers' = 
> '10.100.51.56:9092', 
> 'connector.startup-mode' = 'earliest-offset',
> 'format.type'= 'csv' 
> );
[INFO] Table has been created.

Flink SQL> show tables ;
tx

Flink SQL> select * from tx ;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance 
of 
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer

附:lib包清单
[test@rcx51101 lib]$ pwd
/opt/flink-1.10.2/lib

flink-csv-1.10.2.jar
flink-dist_2.12-1.10.2.jar
flink-jdbc_2.12-1.10.2.jar
flink-json-1.10.2.jar
flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
flink-sql-connector-kafka_2.11-1.10.2.jar
flink-table_2.12-1.10.2.jar
flink-table-blink_2.12-1.10.2.jar
log4j-1.2.17.jar
mysql-connector-java-5.1.48.jar
slf4j-log4j12-1.7.15.jar




hl9...@126.com
 
发件人: Leonard Xu
发送时间: 2020-09-28 16:36
收件人: user-zh
主题: Re: sql-cli执行sql报错
Hi
benchao的回复是的对的,
你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 
flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
 
 
> 相关lib包:
> flink-connector-kafka_2.12-1.10.2.jar
> kafka-clients-0.11.0.3.jar  
 
祝好
Leonard 


Re: FlinkSQL是否支持设置窗口trigger实现continuious trigger呢

2020-09-28 文章 刘建刚
提供另外一种思路:内层是10s的翻滚窗口,外层接一个按5分钟为key的group by。为防止状态过大,可以设置ttl。简单demo如下:
SELECT *
FROM (SELECT TUMBLE_START(proctime, INTERVAL '10' SECOND) AS st, *
 FROM *
 GROUP BY TUMBLE(proctime, INTERVAL '10' SECOND)
   )
GROUP BY st / (5 * 60 * 1000)

赵一旦  于2020年9月27日周日 下午5:45写道:

> Benchao Li那个我会考虑下,主要是了解下,从datastream转过来,想结合业务看看有多少任务是sql也能实现的。
> silence这个不清楚你表达啥意思,统计需求是五分钟粒度的,不通过窗口咋搞。
> 难道是指基于ts人工计算所属窗口w,然后group by w, 其他key  这样嘛。
>
>
>
> silence  于2020年9月27日周日 下午5:37写道:
>
> > 也可以通过普通的非窗口聚合进行实现吧,minibatch设大点
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: sql-cli执行sql报错

2020-09-28 文章 Benchao Li
(1) 的方式相当于一个shade之后的包,会把所有compile的依赖都打进去。
(2) 的方式的话,你需要自己手工添加所有这个connector的依赖,比如你提到的kafka-clients。
而且,kafka-clients本身的依赖如果你没有打到kafka-clients这个包里面的话,那你也需要把
那些compile依赖也都放进来。所以相当于手工做了一遍maven的依赖处理,而且要想全部都
放进来,应该会有很多。

如果你对kafka-clients有修改,建议自己重新依赖自己修改后的kafka-clients打包一个kafka-sql-connector-kafka

赵一旦  于2020年9月28日周一 下午5:51写道:

>
> 看了下pom,在flink-sql-connector-kafka中依赖了flink-connector-kafka-**,该包又依赖了flink-connector-kafka-base-**以及kafka-client。
> 然后flink-sql-connector-kafka做了shade。
>
> 所以看下来,我的那个(1)和(2)理论上效果是一样的。
> 
>
> 顺便讲下,我kafka-clients更换了ssl证书读取方式,用于支持hdfs等分布式协议(直接复用了flink-core中的filesystem实现)。
>
> 赵一旦  于2020年9月28日周一 下午5:42写道:
>
> >
> 这个不是很懂,(1)flink-connector-kafka_2.11-1.11.2.jar+flink-connector-kafka-base_2.11-1.11.2.jar+kafka-clients-0.11.0.3.jar
> > 和(2)flink-sql-connector-kafka**.jar是啥区别呢?
> >
> > 使用(1)可以不?因为我的kafka-clients部分是调整了源码的。
> >
> > Leonard Xu  于2020年9月28日周一 下午4:36写道:
> >
> >> Hi
> >> benchao的回复是的对的,
> >> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> >> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
> >>
> >>
> >> > 相关lib包:
> >> > flink-connector-kafka_2.12-1.10.2.jar
> >> > kafka-clients-0.11.0.3.jar
> >>
> >> 祝好
> >> Leonard
> >
> >
>


-- 

Best,
Benchao Li


Re: flink sql 更新mysql字段

2020-09-28 文章 赵一旦
不清楚flink是否支持insert into t(a,b,c)这样,貌似没看到支持。

不过你这个解决方法还是有的,就是同一个mysql表你可以对应到多个虚拟表比如t1,t2。
t1只定义其中几个字段。t2定义另外几个字段。
此处t1和t2指的是在FlinkSQL中定义表。

lemon  于2020年9月28日周一 下午5:47写道:

> hi各位:
> 请问一下,如果mysql表中有20个字段,现在有多个insert
> into的语句分别更新指定字段,即同一条记录可能有多个insert语句去分别更新不同字段
> 现在遇到的问题是,因为在insert into语句中需要将mysql中所有字段都带上,所以更新会覆盖其他字段的值。
> 例如insert into mysql select a,b c from
> kafka,但是我只要更新a,b字段,c字段想保持原来的值,请问这种情况需要怎么操作?
> flink1.10.1版本 blink


Re: sql-cli执行sql报错

2020-09-28 文章 赵一旦
看了下pom,在flink-sql-connector-kafka中依赖了flink-connector-kafka-**,该包又依赖了flink-connector-kafka-base-**以及kafka-client。
然后flink-sql-connector-kafka做了shade。

所以看下来,我的那个(1)和(2)理论上效果是一样的。

顺便讲下,我kafka-clients更换了ssl证书读取方式,用于支持hdfs等分布式协议(直接复用了flink-core中的filesystem实现)。

赵一旦  于2020年9月28日周一 下午5:42写道:

> 这个不是很懂,(1)flink-connector-kafka_2.11-1.11.2.jar+flink-connector-kafka-base_2.11-1.11.2.jar+kafka-clients-0.11.0.3.jar
> 和(2)flink-sql-connector-kafka**.jar是啥区别呢?
>
> 使用(1)可以不?因为我的kafka-clients部分是调整了源码的。
>
> Leonard Xu  于2020年9月28日周一 下午4:36写道:
>
>> Hi
>> benchao的回复是的对的,
>> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
>> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
>>
>>
>> > 相关lib包:
>> > flink-connector-kafka_2.12-1.10.2.jar
>> > kafka-clients-0.11.0.3.jar
>>
>> 祝好
>> Leonard
>
>


flink sql ????mysql????

2020-09-28 文章 lemon
hi??
??mysql??20??insert 
into??insert??
insert 
intomysql??
insert into mysql select a,b c from 
kafkaa,b??c??
flink1.10.1 blink

Re: sql-cli执行sql报错

2020-09-28 文章 赵一旦
这个不是很懂,(1)flink-connector-kafka_2.11-1.11.2.jar+flink-connector-kafka-base_2.11-1.11.2.jar+kafka-clients-0.11.0.3.jar
和(2)flink-sql-connector-kafka**.jar是啥区别呢?

使用(1)可以不?因为我的kafka-clients部分是调整了源码的。

Leonard Xu  于2020年9月28日周一 下午4:36写道:

> Hi
> benchao的回复是的对的,
> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
>
>
> > 相关lib包:
> > flink-connector-kafka_2.12-1.10.2.jar
> > kafka-clients-0.11.0.3.jar
>
> 祝好
> Leonard


Re:Re: Flink 1.11.1 JDBC Sink 使用旧版配置 数据丢失

2020-09-28 文章 Michael Ran
有主键吗?  有的话不会触发delete 才对
在 2020-09-28 15:54:49,"Leonard Xu"  写道:
>
>
>> 在 2020年9月15日,16:52,LittleFall <1578166...@qq.com> 写道:
>> 
>> 谢谢,请问有相关的 issue 链接吗
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>To @LItteFall :
>
>没有对应的issue,因为是在修复changlog 
>issue[1]时在这个issue里一起修复的,代码可以看下TableBufferReducedStatementExecutor里reduceBuffer保证是对同key上得不同操作顺序执行的。
>
>To @Michael Ran:
>
>update 怎么触发的 delete 哦?
>
>LItteFall 是在数据库的表中触发了update操作,然后数据库的binlog通过 CDC工具 canal 以 canal-json 
>格式写入到kafka的表中,一个update 会对应UPDATE_BEFORE,UPDATE_AFTER两条数据, JDBC connector 
>对应的处理会生成两条sql, 一条delete和一条insert. 
> 
>
>祝好
>Leonard
>[1]https://issues.apache.org/jira/projects/FLINK/issues/FLINK-18461?filter=allissues
> 
>
> 


Re: Flink Batch 模式下,The rpc invocation size 113602196 exceeds the maximum akka framesize

2020-09-28 文章 jy l
如果使用了print()等算子,会将上一个task的结果一次全部pull过来,pull时数据超过了akka framesize大小导致。

李加燕  于2020年9月28日周一 下午3:07写道:

> Flink batch 模式消费hdfs上的文件,并做了一个word count
> 操作,但是task一直运行,查看taskmanager的log,发现如下异常:
> java.lang.reflect.UndeclaredThrowableException: null
> at com.sun.proxy.$Proxy35.updateTaskExecutionState(UnknownSource)
> ~[?:?]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1558)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1588)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2400(TaskExecutor.java:173)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1921)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> Causedby: java.io.IOException: The rpc invocation size 113602196 exceeds
> the maximum akka framesize.
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> ... 28 more
> 我有尝试过在flink-conf.yaml配置akka framesize大小为30M,但是还是不能解决上述问题。
> 请求帮助。


Re: 当kafka有大量数据积压并且flink冷启动,flink端读取kafka有没有限流的参数?

2020-09-28 文章 Xiao Xu
Hi, 据我所知没有限流的功能,最简单的是设置下任务的并行度

me  于 2020年9月27日周日 下午5:45写道:

> flink版本1.11
> flink连接kafka使用的是 flink  addSource特性
>
>
>  原始邮件
> 发件人: me
> 收件人: user-zh
> 发送时间: 2020年9月27日(周日) 17:22
> 主题: 当kafka有大量数据积压并且flink冷启动,flink端读取kafka有没有限流的参数?
>
>
> 当kafka有大量数据积压并且flink冷启动,flink端读取kafka有没有限流的参数?
> kafka积压了2亿数据,flink启动后接着之前的状态读取,怎么控制flink的限流?


Re: sql-cli执行sql报错

2020-09-28 文章 Leonard Xu
Hi
benchao的回复是的对的,
你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 
flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。


> 相关lib包:
> flink-connector-kafka_2.12-1.10.2.jar
> kafka-clients-0.11.0.3.jar  

祝好
Leonard 

Re: Field types of query result and registered TableSink 'table' do not match.

2020-09-28 文章 Leonard Xu
Hi
报错信息是query的schema和sink table 的schema信息不匹配,

>  tumble_start(proctimeField, INTERVAL '10' SECOND) as tum
是保留时间属性的,对应的类型是TimeIndicatorTypeInfo, 如果要转到TIMSTAMP,你可以在输出前, 
cast下tum字段的类型到TIMESTAMP(3)

祝好
Leonard

Re: Re:HistoryServer完成任务丢失的问题

2020-09-28 文章 刘建刚
修复方案为:https://issues.apache.org/jira/browse/FLINK-18959

xiao cai  于2020年9月27日周日 下午6:42写道:

> 貌似是个bug,我的版本是1.11.0
>
>
>
> https://issues.apache.org/jira/browse/FLINK-18959?jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20Bug%20AND%20text%20~%20%22history%20server%22
>
>
>  原始邮件
> 发件人: xiao cai
> 收件人: user-zh
> 发送时间: 2020年9月27日(周日) 18:41
> 主题: Re:Re:HistoryServer完成任务丢失的问题
>
>
> 貌似是个bug
>
>
>  原始邮件
> 发件人: xiao cai
> 收件人: user-zh
> 发送时间: 2020年9月27日(周日) 18:31
> 主题: Re:Re:HistoryServer完成任务丢失的问题
>
>
> 是在history server中没有,但是yarn
> logs还是可以看到的,我理解是任务结束前,jobManager没有upload文件到指定目录,所以history
> server没有拉取到。但是为何没有upload,我通过jobManager的日志也没看出来,没有任何报错。 原始邮件 发件人: Michael
> Ran 收件人: user-zh 发送时间:
> 2020年9月27日(周日) 17:06 主题: Re:Re:HistoryServer完成任务丢失的问题 你的意思是,日志彻底消失了?完全找不到?
> 不会是你任务有问题,压根就没成功,没产生吧。 理论上日志不可能平白无故消失的 在 2020-09-27 17:03:45,"xiao
> cai"  写道: >是的,默认是10s一次,但是这个是去jobmanager的归档目录里拉取的间隔。
> >问题是cancel的那次job,并没有上传日志信息到归档目录里。 > > > 原始邮件 >发件人: Michael Ran<
> greemqq...@163.com> >收件人: user-zh >发送时间:
> 2020年9月27日(周日) 16:45 >主题: Re:HistoryServer完成任务丢失的问题 > > >history
> 记得是定时拉取的,有延迟过去 在 2020-09-27 16:40:27,"xiao cai"  写道:
> >Hi: >flink 1.11.0
> >我启动了一个任务,当这个任务进行checkpoint的时候,我在webui中点击了cancel,任务在yarn中的状态时killed,但是再到history
> server中却找不到这个任务。同时我尝试了再yarn中kill
> application的方式,也是无法发现。这个问题有点随机,不是每次都会出现,有时候执行cancel,在history
> server又能看到。希望了解history serve相关原理的同学给予帮助。 >非常感谢。 > > > > >best, >xiao.


Re: Flink 1.11.1 JDBC Sink 使用旧版配置 数据丢失

2020-09-28 文章 Leonard Xu


> 在 2020年9月15日,16:52,LittleFall <1578166...@qq.com> 写道:
> 
> 谢谢,请问有相关的 issue 链接吗
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

To @LItteFall :

没有对应的issue,因为是在修复changlog 
issue[1]时在这个issue里一起修复的,代码可以看下TableBufferReducedStatementExecutor里reduceBuffer保证是对同key上得不同操作顺序执行的。

To @Michael Ran:

update 怎么触发的 delete 哦?

LItteFall 是在数据库的表中触发了update操作,然后数据库的binlog通过 CDC工具 canal 以 canal-json 
格式写入到kafka的表中,一个update 会对应UPDATE_BEFORE,UPDATE_AFTER两条数据, JDBC connector 
对应的处理会生成两条sql, 一条delete和一条insert. 
 

祝好
Leonard
[1]https://issues.apache.org/jira/projects/FLINK/issues/FLINK-18461?filter=allissues
 

 

flink1.11.2基于官网在k8s上部署是正常的,但是加了volume配置之后报错Read-only file system

2020-09-28 文章 chenxuying
我在使用k8s部署的时候也是按照官网的方式[1],是正常使用的, 然后后面加了volume配置

{

  ...

  "spec": {

...

"template": {

  ...

  "spec": {

"volumes": [

  ...

  {

"name": "libs-volume",

"hostPath": {

  "path": "/data/volumes/flink/jobmanager/cxylib",

  "type": ""

}

  },

  ...

],

"containers": [

  {

...

"volumeMounts": [

  {

"name": "flink-config-volume",

"mountPath": "/opt/flink/conf"

  },

  ...

],

...

  }

],

...

  }

},

...

  },

  ...

}

然后启动jobmanager报错

Starting Job Manager

sed: couldn't open temporary file /opt/flink/conf/sedz0NYKX: Read-only file 
system

sed: couldn't open temporary file /opt/flink/conf/sede6R0BY: Read-only file 
system

/docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml: Permission denied

/docker-entrypoint.sh: 91: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml.tmp: Read-only file system

Starting standalonesession as a console application on host 
flink-jobmanager-66fb98869d-w7plb.

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.

Sep 28, 2020 7:11:14 AM org.apache.hadoop.util.NativeCodeLoader 

WARNING: Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable




[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions

Re: flink在校验DDL属性是否合格时候代码在哪里?

2020-09-28 文章 Leonard Xu
HI

> 在 2020年9月26日,13:26,kcz <573693...@qq.com> 写道:
> 
> 自己写类似的,有太多的if 嵌套了,想学习下,模仿下。

DDL的属性是指WITH里面的属性吗?这个是各个connector校验的,具体的代码在可以先看下各个connector的实现,参考:
DynamicTableSourceFactory DynamicTableSinkFactory 的实现类, 
比如:JdbcDynamicTableFactory

祝好
Leonard

Re:Re: flink使用在docker环境中部署出现的两个问题

2020-09-28 文章 chenxuying
请问一下第一个问题您说的修改启动命令,有例子吗,或者什么文档可以参考




在 2020-09-28 12:50:25,"Yang Wang"  写道:
>第一个问题,因为默认情况下JM/TM进程的STDOUT已经输出到console,所以是没有办公通过webui查看STDOUT输出的
>可以通过docker logs来查看,当然你也可以修改启动命令,把STDOUT重定向到具体的文件
>
>第二个问题,在JobManager和TaskManager的docker-entrypoint.sh[1]里面会修改flink-conf.yaml的,
>所以你mount进去会被修改掉
>
>[1].
>https://github.com/apache/flink-docker/blob/dev-1.11/docker-entrypoint.sh
>
>
>Best,
>Yang
>
>chenxuying  于2020年9月27日周日 下午7:56写道:
>
>> 根据官网[1]使用docker部署flink,session cluster模式
>> 环境win10+docker+flink1.11.2
>> cmd命令
>> docker run ^
>> -d^
>> --rm ^
>> --name=jobmanager ^
>> --hostname=jobmanager ^
>> --network flink-network ^
>> --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^
>> -p 28081:8081 ^
>> flink:1.11.2-scala_2.11 jobmanager
>> docker run ^
>> -d^
>> --rm ^
>> --name=taskmanager ^
>> --hostname=taskmanager ^
>> --network flink-network ^
>> --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^
>> flink:1.11.2-scala_2.11 taskmanager
>>
>>
>> 问题一:
>> 在webui查看任务输出stdout提示找不到输出文件
>> java.util.concurrent.CompletionException:
>> org.apache.flink.util.FlinkException: The file STDOUT does not exist on the
>> TaskExecutor.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>> ~[?:1.8.0_265]
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> ~[?:1.8.0_265]
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> ~[?:1.8.0_265]
>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
>> Caused by: org.apache.flink.util.FlinkException: The file STDOUT does not
>> exist on the TaskExecutor.
>> ... 5 more
>> 2020-09-27 09:04:33,370 ERROR
>> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler
>> [] - Unhandled exception.
>> org.apache.flink.util.FlinkException: The file STDOUT does not exist on
>> the TaskExecutor.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>> ~[?:1.8.0_265]
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> ~[?:1.8.0_265]
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> ~[?:1.8.0_265]
>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
>>
>>
>>
>>
>> 问题二:
>> mount的src的配置文件要多份吗
>> 因为我把env替换成mount,如下
>> docker run ^
>> -d^
>> --rm ^
>> --name=jobmanager ^
>> --network flink-network ^
>> --mount type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf
>> ^
>> -p 28081:8081 ^
>> flink:1.11.2-scala_2.11 jobmanager
>>
>>
>> docker run ^
>> -d^
>> --rm ^
>> --name=taskmanager ^
>> --network flink-network ^
>> --mount type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf
>> ^
>> flink:1.11.2-scala_2.11 taskmanager
>>
>>
>> 结果发现webui上的可用Task Managers为0
>> 每次执行命令的时候都会把mount配置的src下flink-conf.yaml中的jobmanager.rpc.address替换成了新的容器ip
>>
>> 我猜应该是这个原因导致启动taskmanager的时候jobmanager.rpc.address替换成了taskmanager的ip.所以没有Task可用
>> 想问下大佬们,是我哪一步出现问题了吗
>>
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/docker.html


Re: flink sql 1.11.2 jdbc connector 按月分表读取

2020-09-28 文章 Leonard Xu
Hi
> 是我这边建issue不?
> 
> 这里还发现一个问题 select count(*) from mysql_table 不能执行。
>  

是的,用户都可以在JIRA[1]上建issue的。
不能执行是报错吗?可以把错误信息贴下吗?

祝好
Leonard
[1]https://issues.apache.org/jira/projects/FLINK/issues 
 

Flink Batch 模式下,The rpc invocation size 113602196 exceeds the maximum akka framesize

2020-09-28 文章 李加燕
Flink batch 模式消费hdfs上的文件,并做了一个word count 操作,但是task一直运行,查看taskmanager的log,发现如下异常:
java.lang.reflect.UndeclaredThrowableException: null
at com.sun.proxy.$Proxy35.updateTaskExecutionState(UnknownSource) ~[?:?]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1558)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1588)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2400(TaskExecutor.java:173)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1921)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
Causedby: java.io.IOException: The rpc invocation size 113602196 exceeds the 
maximum akka framesize.
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
... 28 more
我有尝试过在flink-conf.yaml配置akka framesize大小为30M,但是还是不能解决上述问题。
请求帮助。

flink1.8.1处理utc时间方式

2020-09-28 文章 zjfpla...@hotmail.com
请问下各位,flink1.8.1,flink 
sql(非java代码转化)方式下,数据格式'2018-1-2T12:00Z'如何sink到oracle的date字段,to_timestamp函数在1.8中还没有

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-28 文章 Shengkai Fang
hi, 你说的是为每个partition生成一个watermark吗? 这样子快流和慢流都会有独立的watermark gererator。
datastream已经支持了该特性, table层正在支持该特性,你可以看看flink-19282的jira。

赵一旦 于2020年9月28日 周一上午11:39写道:

> 我这边负责的作业,一个作业上有2-3个kafka数据源,还包括多个mysql配置流数据源。也是各种join,但是没有union的case。
>
> 没有任何watermark的问题,flink现有机制都是可以完美解决的。
>
>
>
> 赵一旦  于2020年9月28日周一 上午11:37写道:
>
>
>
> >
> 说实话,还是不觉得有这种case。KafkaSouceA(1,1,1,2,2,2,.,100),KafkaSouceB(1,100),然后AB都接到
>
> > operatorC 上,operatorC后续跟个窗口算子operatorD。
>
> > (1)oepratorC负责调用assignTimestampsAndWatermarks
>
> >
> 完成watermark的生成。这么搞会出问题,因为operatorC会更快收到KafkaSouceB的100,进而生成watermark=100(假设maxOutOfOrderness=0)。那么operatorD在收到KafkaSouceA的后续元素会认为迟到丢弃。
>
> >
>
> >
>
> >
> (2)如果在KafkaSouceA部分直接生成watermark,KafkaSourceB部分也直接生成watermark,然后算子operatorC部分相当于是watermark的合并取小,对于后续的operatorD也是不会影响的。只有KafkaSouceA和KafkaSouceB的100都到达(此时KafkaSouceA和KafkaSouceB都分别发出了100的watermark),进而对于operatorC的watermark才会推进到100,再然后是operatorD的watermark推进到100。数据是不会丢失的呀。
>
> >
>
> >
>
> >
> (3)还有一种更暴力的,如果KafkaSouceA和B都是相同kafka集群,只是topic不同,完全可以合并为一个KafkaSouce,使用regex方式指定topic进行消费。
>
> > 最后一点,(2)中说的watermark赋值也有2中实现方案。1
>
> > 是在KafkaSouce后面通过forward分区方式跟一个相同并发的watermarkAssigner即可。 2 是不额外引入算子,而是直接调用
>
> > flinkKafkaConsumer.assignTimestampsAndWatermarks实现。
>
> >
>
> >
>
> > 最后,问下你是FlinkSQL场景还是DataStream API,会不会我们场景不同,sql场景可能是没有这么灵活。
>
> >
>
> > hao kong  于2020年9月28日周一 上午11:12写道:
>
> >
>
> >> 我目前的情况是从多个kafka
>
> >> topic获取数据并union到一起进行处理,例如A流的时间是1-100共1w条数据,B流只有时间是1和100的两条数据,由于目前flink
>
> >> source之间没有*Coordinator*
>
> >>
>
> >>
> ,两个流的流速在数据量相同的时候是一样的,在union后的timewindow标记watermark里会先拿到A流的1,B流的1,然后拿到A流的1.X,B流的100,这时根据watermark的配置,如果没有设置延迟等待,会丢弃掉A流剩下的9998条数据,如果是多源不union,并行处理的话,不会有这个问题。
>
> >>
>
> >> 赵一旦  于2020年9月27日周日 下午5:53写道:
>
> >>
>
> >> >
>
> >> >
>
> >>
> 如果说一共n个分区,但是你就是希望使用m
> >> > kafkaSouce本身会将当前并发实例消费的多个分区的数据分别生成watermark并取小后再发射出去。
>
> >> >
>
> >> > 赵一旦  于2020年9月27日周日 下午5:51写道:
>
> >> >
>
> >> > >
>
> >> > >
>
> >> >
>
> >>
> 还是没看懂。即使你有多个kafka数据源。但是从flink角度来说,就是从若干个partition(一共n个)输入数据(不管来自哪个kafka集群,哪个topic的分区),只要你设置的数据源并行度为n,保证每个并行度只消费一个分区,这样每个并行task实例只针对某一个分区生成watermark(这个要求单分区内部乱序不能太大)。后续算子的watermark是会基于输入算子所有并发实例的watermark取小的,所以貌似也不存在你说的问题。
>
> >> > >
>
> >> > > hao kong  于2020年9月27日周日 下午5:16写道:
>
> >> > >
>
> >> > >> hi
>
> >> > >> 感谢各位,pnowoj...@apache.org为我提供了一个FLIP,
>
> >> > >>
>
> >> > >>
>
> >> >
>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-EventTimeAlignment
>
> >> > >>
>
> >> >
>
> >>
> ,我的问题其实就是这个问题,快流会推高水印导致漫流里的数据被丢弃,我已经在开始尝试依赖zookeeper来写*SourceCoordinator,*
>
> >> > >> 从而协调多个源的数据,主要是考虑这样不用考虑高可用,单独的coordinator需要自己实现高可用。
>
> >> > >>
>
> >> > >> 赵一旦  于2020年9月21日周一 下午5:50写道:
>
> >> > >>
>
> >> > >> > 的确问题没说明白,貌似flink不会存在类似问题。
>
> >> > >> >
>
> >> > >> > hao kong  于2020年9月16日周三 下午6:45写道:
>
> >> > >> >
>
> >> > >> > > 十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。
>
> >> > >> > >
>
> >> > >> > > Congxian Qiu  于2020年9月16日周三 下午1:55写道:
>
> >> > >> > >
>
> >> > >> > > > Hi
>
> >> > >> > > > 没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢?
>
> >> > >> > > > 如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source
>
> >> > >> > > > 数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个
>
> >> Flink
>
> >> > >> > > > 作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制
>
> >> > >> > > > 另外你也可以看下这个文档[2] 看是否在你的场景中有帮助
>
> >> > >> > > >
>
> >> > >> > > > [1]
>
> >> > >> > > >
>
> >> > >> > > >
>
> >> > >> > >
>
> >> > >> >
>
> >> > >>
>
> >> >
>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
>
> >> > >> > > > [2]
>
> >> > >> > > >
>
> >> > >> > > >
>
> >> > >> > >
>
> >> > >> >
>
> >> > >>
>
> >> >
>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>
> >> > >> > > > Best,
>
> >> > >> > > > Congxian
>
> >> > >> > > >
>
> >> > >> > > >
>
> >> > >> > > > hao kong  于2020年9月16日周三 上午10:24写道:
>
> >> > >> > > >
>
> >> > >> > > > > hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
>
> >> > >> > > > > window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
>
> >> > >> > > > >
>
> >> > >> > > > >
>
> >> > >> > > >
>
> >> > >> > >
>
> >> > >> >
>
> >> > >>
>
> >> >
>
> >>
> 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。
>
> >> > >> > > > >
>
> >> > >> > > >
>
> >> > >> > >
>
> >> > >> >
>
> >> > >>
>
> >> > >
>
> >> >
>
> >>
>
> >
>
>