有没有可能使用tikv作为flink 分布式的backend
项目里有部分需要进行状态共享的需求,多个flink 任务之间 如题,tikv本身基于rocksdb 是否有可能扩展成为分布式 backend -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink on yarn配置问题
最近想试一下flink on yarn,yarn是公司之前就有的,但之前只运行过spark,现在想试一下flink。 但是不少报错,现在到如下情况了。 23:09:11.181 [main] ERROR com.xxx.Application - Main Method catched exception: {} org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster. at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:397) at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1733) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) at com.xxx.Application.main(Application.java:116) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: org.apache.flink.configuration.IllegalConfigurationException: The number of requested virtual cores for application master 1 exceeds the maximum number of virtual cores 0 available in the Yarn Cluster. at org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:283) at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444) at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:390) ... 22 common frames omitted org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster. at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:397) at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1733) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) at com.xxx.Application.main(Application.java:116) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at
flink-sql-gateway还会更新吗
flink1.11用不了flink-sql-gateway,不知道还会不会更新 18579099...@163.com
Re: state序列化问题
了解了!谢谢! Yun Tang 于2020年8月21日周五 下午4:00写道: > Hi > > 其实你的问题就是MapState中的value本身是java的map结构,也就是对应MapStateDescriptor里面的valueSerializer是否需要区分显示声明成HashMap类型,这个取决于你的value > serializer实现,如果你用的是Flink内置的MapSerializer[1],没必要声明成HashMap类型。 > > > [1] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java > > > 祝好 > 唐云 > > > From: shizk233 > Sent: Friday, August 21, 2020 10:51 > To: user-zh@flink.apache.org > Subject: Re: state序列化问题 > > 抱歉,是我表述不清楚,ListState>只是举个例子,并不是我的应用场景实际的状态。 > > 从实际考虑,我想利用MapState保存一系列特殊的计数器Map,也就是MapState>,主要用来做一个伪窗口,key是窗口的开始时间。 > > 主要想知道,在MapStateDescriptor声明类型信息时,我是否应该把内部Map声明成明确的HashMap类型,而不是Map类型? > > Yun Tang 于2020年8月21日周五 上午12:13写道: > > > Hi > > > > 如果想要在list中保存String,也就是list中的每个元素格式是String,ListState的格式应该是 > > ListState, 而不是 > > ListState>,后者表示有一个list,list中的每一个元素均是一个list > > > > ListState 本身并不属于java的collection,所以不存在ArrayList 与 LinkedList的区别。 > > > > 祝好 > > 唐云 > > > > From: shizk233 > > Sent: Thursday, August 20, 2020 18:00 > > To: user-zh@flink.apache.org > > Subject: state序列化问题 > > > > Hi all, > > > > 请教一下,State应该是通过StateDescriptor提取的类型信息来序列化/反序列化, > > 那么如果声明为接口类型,如ListState>,但实际存入的是ArrayList/LinkedList, > > 会对类型信息提取产生不良影响吗? > > > > 按我的理解,ArrayList和LinkedList在序列化时的bytes组成结构应该是不太一样的。 > > 但是都可以作为List>来声明。 > > > > 请求野生的大佬支援一下! > > >
????????????checkpoint????
Hello all, iterative stream job checkpoint??checkpoint state k??org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.?? ?? env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setCheckpointTimeout(60); checkpointConfig.setMinPauseBetweenCheckpoints(6); checkpointConfig.setMaxConcurrentCheckpoints(4); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.setPreferCheckpointForRecovery(true); checkpointConfig.setTolerableCheckpointFailureNumber(2); checkpointConfig.enableUnalignedCheckpoints(); ??
Re: 基于flink1.10源码编译问题
不要用aliyun maven repo,另外你这是1.10-SNAPSHOT 不是1.10的release版本 魏烽 于2020年8月21日周五 下午8:44写道: > 各位好: > > >
Re: Re: 如何设置FlinkSQL并行度
Hi forideal, 我在本地试了一下,没有复现你说的这个情况。 我看代码也没有这个逻辑,如果是没有分配到partition,应该是会阻塞住,而不是finish。 你这个测试用的是社区的版本么?还是有什么特殊的改动? forideal 于2020年8月21日周五 下午11:43写道: > Hi 赵一旦, > 基础信息:使用 watermark for 语法设置watermark,Flink SQL,Blink planner,Flink 1.10.0 > 我最近做了一个实验,将Flink SQL 的并发设置为 kafka topic partition 的 2 倍,同时设置 idle 的时间为 10s。 > 这时,1.source 会有一半的partition 立马就 finished > 2.下游的 workmark 变成了LONG的最大值 > 整个任务都无法正常运行了。 > > > Best forideal > > > > > 在 2020-08-17 10:05:48,"Zhao,Yi(SEC)" 写道: > >我这边才研究FlinkSQL没几天。不过按照目前了解,是不支持算子级别并行度设置的。 > > >此外你说的checkpoint无法正常触发,我估计是因为barrier的问题,部分并行示例没有分区数据,导致没数据就可能导致。这个问题类似,可能无解。 > > > >非要解决可以写代码,把souce部分不使用sql实现。 > >__ > > > >在 2020/8/15 下午8:21,“forideal” 写入: > > > >Hi 赵一旦, > > > > > >目前 Flink SQL 我这边使用也是无法指定各个算子的并行度。目前我这边遇到两个问题。 > >1.并行度超过 topic partition 的时候会造成资源浪费 > >2.并行度超过 topic partition 后,checkpoint 也无法正常触发了 > > > > > >Best forideal > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >在 2020-08-14 12:03:32,"赵一旦" 写道: > >>检查点呢,大多数用FlinkSQL的同学们,你们的任务是随时可运行那种吗,不是必须保证不可间断的准确性级别吗? > >> > >>Xingbo Huang 于2020年8月14日周五 下午12:01写道: > >> > >>> Hi, > >>> > >>> 关于并行度的问题,据我所知,目前Table API上还没法对每一个算子单独设置并行度 > >>> > >>> Best, > >>> Xingbo > >>> > >>> Zhao,Yi(SEC) 于2020年8月14日周五 上午10:49写道: > >>> > >>> > > 并行度问题有人帮忙解答下吗,此外补充个相关问题,除了并行度,flink-sql情况下,能做检查点/保存点,并基于检查点/保存点重启sql任务吗。 > >>> > > >>> > 发件人: "Zhao,Yi(SEC)" > >>> > 日期: 2020年8月13日 星期四 上午11:44 > >>> > 收件人: "user-zh@flink.apache.org" > >>> > 主题: 如何设置FlinkSQL并行度 > >>> > > >>> > 看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。 > >>> > 如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢? > >>> > > >>> > 比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。 > >>> > > >>> > > >>> > > > > > -- Best, Benchao Li
Re: flink orc与hive2.1.1版本冲突怎么解决
是说写数据用的是flink的FileSystem connector,然后读数据是用hive自己去读的?具体错误的stacktrace贴一下看看呢 On Fri, Aug 21, 2020 at 3:05 PM wrote: > flink table > sql把mysql的表数据写到hdfs的路径上,存成orc。hive创建外部表,查询报错。最根本原因是hive2.1是把orc的相关类打包一起的,包名举例,org.apache.hive.orc.,而且新版的是org.apache.orc.。 > > 发自我的iPhone > > > 在 2020年8月21日,14:37,Rui Li 写道: > > > > Hi, > > > > 能不能具体描述一下你的作业是怎么写的?比如怎么创建的hive表,如何把数据写进去之类的。我们可以试试能不能重现你的问题 > > > >> On Fri, Aug 21, 2020 at 1:41 PM wrote: > >> > >> 试过了,一样的,本质也是通过写文件。 > >> > >> 发自我的iPhone > >> > 在 2020年8月21日,13:35,Jingsong Li 写道: > >>> > >>> 是的 > >>> > On Fri, Aug 21, 2020 at 1:30 PM wrote: > > flink hive表的方式是什么意思?hive streaming吗? > > 发自我的iPhone > > >> 在 2020年8月21日,13:27,Jingsong Li 写道: > > > > Flink filesystem connector 或者 DataStream用flink-orc > 的版本是比较新的版本,所以老版本的ORC读不了。 > > > > 建议你用Flink hive表的方式来写orc > > > >> On Fri, Aug 21, 2020 at 12:25 PM wrote: > >> > >> Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。 > >> > >> 发自我的iPhone > >> > 在 2020年8月21日,12:15,Jingsong Li 写道: > >>> > >>> 如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive > sql写orc是一样的。 > >>> 确定这个版本hive写出的数据可以被读取吗? > >>> > On Fri, Aug 21, 2020 at 10:17 AM wrote: > > 使用版本是flink 1.11 > Hive 2.1.1 > flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬? > > > > >>> > >>> -- > >>> Best, Jingsong Lee > >> > > > > > > -- > > Best, Jingsong Lee > > > >>> > >>> -- > >>> Best, Jingsong Lee > >> > >> > > > > -- > > Best regards! > > Rui Li > > -- Best regards! Rui Li
flink prometheus 无法上报accumulator类型监控吗
如题,没找到accumulator类型数据,metric之类找到了,但是accumulator类没找到。
Re: flink orc与hive2.1.1版本冲突怎么解决
在内网,弄不出来。数组业界,在OrcFile$WriterVersion.from(OrcFile.java:145) 目前升级hive到2.3能正常用了,因为从hive2.3开始有独立的orc-core jar了。但是我们用的是CDH,这样子管理不太方便。 发自我的iPhone > 在 2020年8月21日,16:18,Rui Li 写道: > > 是说写数据用的是flink的FileSystem connector,然后读数据是用hive自己去读的?具体错误的stacktrace贴一下看看呢 > >> On Fri, Aug 21, 2020 at 3:05 PM wrote: >> >> flink table >> sql把mysql的表数据写到hdfs的路径上,存成orc。hive创建外部表,查询报错。最根本原因是hive2.1是把orc的相关类打包一起的,包名举例,org.apache.hive.orc.,而且新版的是org.apache.orc.。 >> >> 发自我的iPhone >> 在 2020年8月21日,14:37,Rui Li 写道: >>> >>> Hi, >>> >>> 能不能具体描述一下你的作业是怎么写的?比如怎么创建的hive表,如何把数据写进去之类的。我们可以试试能不能重现你的问题 >>> On Fri, Aug 21, 2020 at 1:41 PM wrote: 试过了,一样的,本质也是通过写文件。 发自我的iPhone >> 在 2020年8月21日,13:35,Jingsong Li 写道: > > 是的 > >> On Fri, Aug 21, 2020 at 1:30 PM wrote: >> >> flink hive表的方式是什么意思?hive streaming吗? >> >> 发自我的iPhone >> 在 2020年8月21日,13:27,Jingsong Li 写道: >>> >>> Flink filesystem connector 或者 DataStream用flink-orc >> 的版本是比较新的版本,所以老版本的ORC读不了。 >>> >>> 建议你用Flink hive表的方式来写orc >>> On Fri, Aug 21, 2020 at 12:25 PM wrote: Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。 发自我的iPhone >> 在 2020年8月21日,12:15,Jingsong Li 写道: > > 如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive >> sql写orc是一样的。 > 确定这个版本hive写出的数据可以被读取吗? > >> On Fri, Aug 21, 2020 at 10:17 AM wrote: >> >> 使用版本是flink 1.11 >> Hive 2.1.1 >> flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬? >> >> >> > > -- > Best, Jingsong Lee >>> >>> >>> -- >>> Best, Jingsong Lee >> >> > > -- > Best, Jingsong Lee >>> >>> -- >>> Best regards! >>> Rui Li >> >> > > -- > Best regards! > Rui Li
flink1.11 cdc使用
hi 我这面想使用flinkcdc做实时etl,我看可以做到维表(时态表)关联,现在想问一下能在cdc功能中用聚合算子嘛,全局groupby或窗口函数
Re: flink orc与hive2.1.1版本冲突怎么解决
Hi, 能不能具体描述一下你的作业是怎么写的?比如怎么创建的hive表,如何把数据写进去之类的。我们可以试试能不能重现你的问题 On Fri, Aug 21, 2020 at 1:41 PM wrote: > 试过了,一样的,本质也是通过写文件。 > > 发自我的iPhone > > > 在 2020年8月21日,13:35,Jingsong Li 写道: > > > > 是的 > > > >> On Fri, Aug 21, 2020 at 1:30 PM wrote: > >> > >> flink hive表的方式是什么意思?hive streaming吗? > >> > >> 发自我的iPhone > >> > 在 2020年8月21日,13:27,Jingsong Li 写道: > >>> > >>> Flink filesystem connector 或者 DataStream用flink-orc > >> 的版本是比较新的版本,所以老版本的ORC读不了。 > >>> > >>> 建议你用Flink hive表的方式来写orc > >>> > On Fri, Aug 21, 2020 at 12:25 PM wrote: > > Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。 > > 发自我的iPhone > > >> 在 2020年8月21日,12:15,Jingsong Li 写道: > > > > 如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive sql写orc是一样的。 > > 确定这个版本hive写出的数据可以被读取吗? > > > >> On Fri, Aug 21, 2020 at 10:17 AM wrote: > >> > >> 使用版本是flink 1.11 > >> Hive 2.1.1 > >> flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬? > >> > >> > >> > > > > -- > > Best, Jingsong Lee > > >>> > >>> > >>> -- > >>> Best, Jingsong Lee > >> > >> > > > > -- > > Best, Jingsong Lee > > -- Best regards! Rui Li
Re: state序列化问题
Hi 其实你的问题就是MapState中的value本身是java的map结构,也就是对应MapStateDescriptor里面的valueSerializer是否需要区分显示声明成HashMap类型,这个取决于你的value serializer实现,如果你用的是Flink内置的MapSerializer[1],没必要声明成HashMap类型。 [1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java 祝好 唐云 From: shizk233 Sent: Friday, August 21, 2020 10:51 To: user-zh@flink.apache.org Subject: Re: state序列化问题 抱歉,是我表述不清楚,ListState>只是举个例子,并不是我的应用场景实际的状态。 从实际考虑,我想利用MapState保存一系列特殊的计数器Map,也就是MapState>,主要用来做一个伪窗口,key是窗口的开始时间。 主要想知道,在MapStateDescriptor声明类型信息时,我是否应该把内部Map声明成明确的HashMap类型,而不是Map类型? Yun Tang 于2020年8月21日周五 上午12:13写道: > Hi > > 如果想要在list中保存String,也就是list中的每个元素格式是String,ListState的格式应该是 > ListState, 而不是 > ListState>,后者表示有一个list,list中的每一个元素均是一个list > > ListState 本身并不属于java的collection,所以不存在ArrayList 与 LinkedList的区别。 > > 祝好 > 唐云 > > From: shizk233 > Sent: Thursday, August 20, 2020 18:00 > To: user-zh@flink.apache.org > Subject: state序列化问题 > > Hi all, > > 请教一下,State应该是通过StateDescriptor提取的类型信息来序列化/反序列化, > 那么如果声明为接口类型,如ListState>,但实际存入的是ArrayList/LinkedList, > 会对类型信息提取产生不良影响吗? > > 按我的理解,ArrayList和LinkedList在序列化时的bytes组成结构应该是不太一样的。 > 但是都可以作为List>来声明。 > > 请求野生的大佬支援一下! >
Re: Flink 启动问题
从代码上看,-yt 后的都会文件夹内容都会上传到HDFS => 1.7版本: 只有一个-yt参数生效,并且-yt后只能跟文件夹。例如-yt /tmp/xx (实测) => 1.8到1.11版本: 可以有多个-yt参数,并且-yt后只能跟文件夹 例如-yt /tmp/xx1 -yt /tmp/xx2 (实测) => 1.11版本以后(master分支): 可以有多个-yt参数。-yt后可以跟文件或文件夹 - guaishushu1...@163.com wrote > 大佬们知道 flink 的-yt命令是不支持多个目录吗,而且只能上传到集群.jar文件吗??? > > > guaishushu1103@ -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink orc与hive2.1.1版本冲突怎么解决
flink table sql把mysql的表数据写到hdfs的路径上,存成orc。hive创建外部表,查询报错。最根本原因是hive2.1是把orc的相关类打包一起的,包名举例,org.apache.hive.orc.,而且新版的是org.apache.orc.。 发自我的iPhone > 在 2020年8月21日,14:37,Rui Li 写道: > > Hi, > > 能不能具体描述一下你的作业是怎么写的?比如怎么创建的hive表,如何把数据写进去之类的。我们可以试试能不能重现你的问题 > >> On Fri, Aug 21, 2020 at 1:41 PM wrote: >> >> 试过了,一样的,本质也是通过写文件。 >> >> 发自我的iPhone >> 在 2020年8月21日,13:35,Jingsong Li 写道: >>> >>> 是的 >>> On Fri, Aug 21, 2020 at 1:30 PM wrote: flink hive表的方式是什么意思?hive streaming吗? 发自我的iPhone >> 在 2020年8月21日,13:27,Jingsong Li 写道: > > Flink filesystem connector 或者 DataStream用flink-orc 的版本是比较新的版本,所以老版本的ORC读不了。 > > 建议你用Flink hive表的方式来写orc > >> On Fri, Aug 21, 2020 at 12:25 PM wrote: >> >> Flink是指定orc版本的,并没有用hive的。所以写进去之后,hive读不出来。 >> >> 发自我的iPhone >> 在 2020年8月21日,12:15,Jingsong Li 写道: >>> >>> 如果是hive table的写,flink sql是使用hive对应版本的orc的,所以理论上效果和hive sql写orc是一样的。 >>> 确定这个版本hive写出的数据可以被读取吗? >>> On Fri, Aug 21, 2020 at 10:17 AM wrote: 使用版本是flink 1.11 Hive 2.1.1 flink sql写到orc后,创建外部表发现无法正常读取,这个怎么解决,各位大佬? >>> >>> -- >>> Best, Jingsong Lee >> > > > -- > Best, Jingsong Lee >>> >>> -- >>> Best, Jingsong Lee >> >> > > -- > Best regards! > Rui Li