Flink 1.12 Blink planner timestamp类型转换异常

2022-01-11 文章 张健
Hi, all




flink1.12 Blink planner有人遇到过这样的问题么:



下面是简化的逻辑
DataStream ds = .map(xxxRichMapFunction);
Table table = tableEnv.fromDataStream(ds);
tableEnv.toAppendStream(table.select(xxx).keyBy(xxx), 
Row.class).addSink(xxxRichSinkFunction);


xxxRichMapFunction中对某个字段写了row.setField(index, java.sql.timestamp.valueOf(str)); 
即写入的值是一个 java.sql.timestamp类型的字段,且xxxRichMapFunction 中getProducedType() 
中对应的位置也是Types.SQL_TIMESTAMP。
但在xxxRichSinkFunction我读取该字段时,却变成了java.time.LocalDateTime类型,导致在JdbcUtils.setRecordToStatement中抛出了java.lang.ClassCastException


我用 Blink planner就会有这个问题,如果程序启动时用useOldPlanner()就不会有这个问题。


张健




 





 

Flink 1.12 Blink planner timestamp类型转换异常

2022-01-11 文章 张健
Hi, all




flink1.12 Blink planner有人遇到过这样的问题么:



下面是简化的逻辑
DataStream ds = .map(xxxRichMapFunction);
Table table = tableEnv.fromDataStream(ds);
tableEnv.toAppendStream(table.select(xxx).keyBy(xxx), 
Row.class).addSink(xxxRichSinkFunction);


xxxRichMapFunction中对某个字段写了row.setField(index, java.sql.timestamp.valueOf(str)); 
即写入的值是一个 java.sql.timestamp类型的字段,且xxxRichMapFunction 中getProducedType() 
中对应的位置也是Types.SQL_TIMESTAMP。
但在xxxRichSinkFunction我读取该字段时,却变成了java.time.LocalDateTime类型,导致在JdbcUtils.setRecordToStatement中抛出了java.lang.ClassCastException


我用 Blink planner就会有这个问题,如果程序启动时用useOldPlanner()就不会有这个问题。


张健




 

退订

2022-01-11 文章 qhp...@hotmail.com
退订



qhp...@hotmail.com


Re: sql-gateway和jdbc-driver还维护吗?

2022-01-11 文章 godfrey he
Hi Ada,

sql-gateway之前没有维护起来,确实是一个遗憾。
最近我们也关注到大家对batch的兴趣越来越浓,sql-gateway还会继续维护。

btw,非常欢迎分享一下你们使用Flink替换Spark遇到的一些痛点,我们会逐渐去解决这些痛点

Best,
Godfrey

Ada Wong  于2022年1月12日周三 10:09写道:
>
> cc tsreaper and Godfrey He
>
> 文末丶 <809097...@qq.com.invalid> 于2022年1月10日周一 19:39写道:
>
> >
> > 试下https://github.com/DataLinkDC/dlink 看看能不能满足你的需求
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:
> > "user-zh"   
> >  
> >  > 发送时间:2022年1月10日(星期一) 晚上7:32
> > 收件人:"user-zh" >
> > 主题:Re: sql-gateway和jdbc-driver还维护吗?
> >
> >
> >
> > https://github.com/ververica/flink-jdbc-driver
> > https://github.com/ververica/flink-sql-gateway
> >
> > Ada Wong  > 
> >  我看这俩项目一两年没更新了。想用Flink彻底替换到Spark,这俩项目是刚需,用来替换SparkThriftServer。


Re: sql-gateway和jdbc-driver还维护吗?

2022-01-11 文章 Ada Wong
cc tsreaper and Godfrey He

文末丶 <809097...@qq.com.invalid> 于2022年1月10日周一 19:39写道:

>
> 试下https://github.com/DataLinkDC/dlink 看看能不能满足你的需求
>
>
>
>
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2022年1月10日(星期一) 晚上7:32
> 收件人:"user-zh"
> 主题:Re: sql-gateway和jdbc-driver还维护吗?
>
>
>
> https://github.com/ververica/flink-jdbc-driver
> https://github.com/ververica/flink-sql-gateway
>
> Ada Wong  
>  我看这俩项目一两年没更新了。想用Flink彻底替换到Spark,这俩项目是刚需,用来替换SparkThriftServer。


Flink 1.12 Blink planner timestamp类型转换异常

2022-01-11 文章 张健
Hi, all




flink1.12 Blink planner有人遇到过这样的问题么:



下面是简化的逻辑
DataStream ds = .map(xxxRichMapFunction);
Table table = tableEnv.fromDataStream(ds);
tableEnv.toAppendStream(table.select(xxx).keyBy(xxx), 
Row.class).addSink(xxxRichSinkFunction);


xxxRichMapFunction中对某个字段写了row.setField(index, java.sql.timestamp.valueOf(str)); 
即写入的值是一个 java.sql.timestamp类型的字段,且xxxRichMapFunction 中getProducedType() 
中对应的位置也是Types.SQL_TIMESTAMP。
但在xxxRichSinkFunction我读取该字段时,却变成了java.time.LocalDateTime类型,导致在JdbcUtils.setRecordToStatement中抛出了java.lang.ClassCastException


我用 Blink planner就会有这个问题,如果程序启动时用useOldPlanner()就不会有这个问题。


张健

(无主题)

2022-01-11 文章 生如夏花
退订

Re: 如何确定分配内存的大小

2022-01-11 文章 Chang Li
目前这个更多还是一个经验值,和具体业务有关使用有关,建议任务运行后观察JM和TM的GC情况后再做调整

许友昌 <18243083...@163.com> 于2022年1月10日周一 15:18写道:

> 请问在启动flink 任务时,要如何确定该分配多少内存给 jobmanager,分配多少给 taskmanager,当我们指定 -ytm 1024
> 或 -ytm 2048 的依据是什么?


Re: 谁能解释一下 GlobalStreamExchangeMode 这几种交换模式的不同和使用场景吗?

2022-01-11 文章 Chang Li
在生产环境中使用Flink是批示作业是OK的,不是很依赖Flink Remote Shuffle Service
Flink Remote Shuffle Service
主要解决大数据量Shuffle场景下的稳定性,目前Batch会将Shuffle的结果写本地磁盘,数量大的时候会容易将磁盘写满,稳定性也相对比较差

casel.chen  于2021年12月2日周四 08:26写道:

> GlobalStreamExchangeMode 这几种交换模式的不同和使用场景是什么?哪些适合流式作业,哪些适合批式作业?
> Flink Remote Shuffle Service的推出是不是意味着可以在生产环境使用Flink处理批式作业?谢谢!
>
>
> package org.apache.flink.streaming.api.graph;
>
>
>
>
> import org.apache.flink.annotation.Internal;
>
>
>
>
> @Internal
>
> public enum GlobalStreamExchangeMode {
>
> ALL_EDGES_BLOCKING,
>
> FORWARD_EDGES_PIPELINED,
>
> POINTWISE_EDGES_PIPELINED,
>
> ALL_EDGES_PIPELINED,
>
> ALL_EDGES_PIPELINED_APPROXIMATE;
>
>
>
>
> private GlobalStreamExchangeMode() {
>
> }
>
> }
>
>
>


Re: 关于streamFileSink在checkpoint下生成文件问题

2022-01-11 文章 Chang Li
直接用的开源版本吗?还是公司内部有改动,原生的cp是固定频率,而很多公司离线计算都是整点触发的,为了减少延迟,会自定义在整点触发一次cp,开源目前没有这个feature

黄志高  于2021年12月1日周三 21:53写道:

> hi,各位大佬,咨询个问题
>
>  
> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
>
>
>
>


Re: Re: 关于streamFileSink在checkpoint下生成文件问题

2022-01-11 文章 Chang Li
直接用的开源版本吗?还是公司内部有改动,原生的cp是固定频率,而很多公司离线计算都是整点触发的,为了减少延迟,会自定义在整点触发一次cp,开源目前没有这个feature

黄志高  于2021年12月2日周四 14:14写道:

> |
>
>
>
>
> 32684
> |
> COMPLETED
> | 8/8 | 13:52:36 | 13:52:38 | 2s | 126 KB | 0 B |
> | | 32683 |
> COMPLETED
> | 8/8 | 13:42:36 | 13:42:39 | 2s | 126 KB | 0 B |
> | | 32682 |
> COMPLETED
> | 8/8 | 13:32:36 | 13:32:39 | 2s | 126 KB | 0 B |
> | | 32681 |
> COMPLETED
> | 8/8 | 13:22:36 | 13:22:39 | 2s | 125 KB | 0 B |
> | | 32680 |
> COMPLETED
> | 8/8 | 13:12:36 | 13:12:39 | 2s | 125 KB | 0 B |
> | | 32679 |
> COMPLETED
> | 8/8 | 13:02:36 | 13:02:41 | 4s | 214 KB | 0 B |
> 上图是checkpoint
>
>
> 这个是在11月30号0时段生成的文件
> 2021-11-30 00:00:011080827 athena_other-0-217891.gz
> 2021-11-30 00:02:424309209 athena_other-0-217892.gz
> 2021-11-30 00:12:403902474 athena_other-0-217893.gz
> 2021-11-30 00:22:403886322 athena_other-0-217894.gz
> 2021-11-30 00:32:403988037 athena_other-0-217895.gz
> 2021-11-30 00:42:403892343 athena_other-0-217896.gz
> 2021-11-30 00:52:392972183 athena_other-0-217897.gz
> 2021-11-30 00:00:011125774 athena_other-1-219679.gz
> 2021-11-30 00:02:424338748 athena_other-1-219680.gz
> 2021-11-30 00:12:404204571 athena_other-1-219681.gz
> 2021-11-30 00:22:403852791 athena_other-1-219682.gz
> 2021-11-30 00:32:404025214 athena_other-1-219683.gz
> 2021-11-30 00:42:404205107 athena_other-1-219684.gz
> 2021-11-30 00:52:392922192 athena_other-1-219685.gz
> 2021-11-30 00:00:011103734 athena_other-2-220084.gz
>
>
> 这个是1点生成的文件
> 2021-11-30 01:00:011228793 athena_other-0-217951.gz
> 2021-11-30 01:02:424243566 athena_other-0-217952.gz
> 2021-11-30 01:12:404106305 athena_other-0-217953.gz
> 2021-11-30 01:22:404456214 athena_other-0-217954.gz
> 2021-11-30 01:32:414303156 athena_other-0-217955.gz
> 2021-11-30 01:42:404688872 athena_other-0-217956.gz
> 2021-11-30 01:52:403251910 athena_other-0-217957.gz
> 2021-11-30 01:00:011163354 athena_other-1-219736.gz
> 2021-11-30 01:02:424405233 athena_other-1-219737.gz
> 2021-11-30 01:12:404094502 athena_other-1-219738.gz
> 2021-11-30 01:22:404395071 athena_other-1-219739.gz
> 2021-11-30 01:32:404205169 athena_other-1-219740.gz
> 2021-11-30 01:42:404432610 athena_other-1-219741.gz
> 2021-11-30 01:52:403224111 athena_other-1-219742.gz
> 2021-11-30 01:00:011163964 athena_other-2-220137.gz
>
>
>
>
> 之前的截图无法发送,我把文件贴出来,打扰了
>
>
>
>
>
>
>
> 在 2021-12-02 13:52:43,"黄志高"  写道:
>
>
>
>
>
> Hi,我把文件放到下面的,文件在checkpoint可见我是理解的,但是文件的生成时间应该是在checkpoint以后是正常的,但是我却在每个整点时段看见数据文件,如下图所示,按理说文件的生成都是在checkpoint之后的,也就是2分,12,22,32,42,52分后,而每个00分都会生成一个数据文件,不理解这个文件怎么生成的,内部的滚动策略是OnCheckpointRollingPolicy
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-12-02 11:37:31,"Caizhi Weng"  写道:
> >Hi!
> >
> >邮件里看不到图片和附件,建议使用外部图床。
> >
> >partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证
> >exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。
> >
> >黄志高  于2021年12月1日周三 下午9:53写道:
> >
> >> hi,各位大佬,咨询个问题
> >>
> >>
> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
> >>
> >>
> >>
> >>
>
>
>
>
>
>


回复: flink sql 如何提高下游并发度?

2022-01-11 文章 许友昌
hi,


设置了parallelism=10 ,实际上是分配了 10 个 slot,flink 是会共享 slot 的,所以 sink 会有 10 线程。 

在2022年1月11日 16:53,RS 写道:
Hi,
请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10?
如果source是10的话,那还有7个线程就空闲了?



在 2022-01-11 11:10:41,"Caizhi Weng"  写道:
Hi!

可以设置 parallelism.default 为需要的并发数。

Jeff  于2022年1月9日周日 19:44写道:

当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?


Re: Flink on Native K8s 部署模式下Tm和Jm容器配置Hosts问题

2022-01-11 文章 Yang Wang
你可以通过环境变量或者flink config option的方式来指定kube config

export KUBECONFIG=/path/of/kube.config

或者

-Dkubernetes.config.file=/path/of/kube.config

具体的代码在这里[1]

[1].
https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java#L58


Best,
Yang

JianWen Huang  于2022年1月10日周一 22:04写道:

> 首先感谢您答复。我也想到了采用第二种JOB动态+ConfigMap挂到Flink Client Pod中,然后命令提交。
> 另外您和官方文档都提到kube config的配置。请问flink client在源码实现中是在哪个地方去解析读取kube config的?
>
> Yang Wang  于2022年1月10日周一 15:17写道:
> >
> > 抱歉回复晚了
> >
> > 在实践中,Flink on Native K8s的部署方式需要一个机器同时拥有k8s和flink客户端才能很好的完成部署工作。
> >
> > Flink client并不依赖K8s客户端的,只要有对应的kube config就可以了
> >
> >
> > 你说的两种方法都是可以的,而且也没有本质上的差异。都是把Flink client运行在集群内来完成提交,第一种是常驻的,第二种是动态起的 。
> > 如果作业使用的pod template都是一样的,那就可以自己保存在ConfigMap中然后挂载给Flink client pod就可以了。
> > 如果每个作业使用的都不同,就只能按照你说的方法了
> >
> >
> > 另外,还有一个可行的思路是开发一个你们自己的K8s operator,然后通过CR的方式进行传递。可以参考这个简单的demo[1]
> >
> > [1]. https://github.com/wangyang0918/flink-native-k8s-operator
> >
> >
> > Best,
> > Yang
> >
> >
> >
> > JianWen Huang  于2021年12月30日周四 00:01写道:
> >
> > > 明白了。感谢。
> > > 在实践中,Flink on Native K8s的部署方式需要一个机器同时拥有k8s和flink客户端才能很好的完成部署工作。
> > > 请问在工程实践上有什么比较好的持续集成提交方式。我目前想到两种。
> > > 1.在k8s 启动一个带flink客户端的容器。在容器内部进行命令行提交。
> > > 2.在k8s以带Flink客户端的镜像启动一个Job类型作业,然后在作业运行时进行命令提交。
> > >
> > >
> 第1种对于kubernetes.pod-template-file的提交需要把kubernetes.pod-template-file中的模板文件cp到容器中。
> > > 第2种需要提前把kubernetes.pod-template-file文件打到带Flink客户端的镜像中。
> > > 请问您有更好的方法吗。
> > >
> > > Yang Wang  于2021年12月26日周日 16:39写道:
> > > >
> > > > 拿如下提交命令举例,pod-temlate.yaml是在和运行run-application这个命令相同的机器上面。Flink
> > > > client会自动把这个文件存放到ConfigMap,然后挂载给JM的
> > > > user jar(StateMachineExample.jar)是需要在镜像里面
> > > >
> > > > 注意:一般需要在镜像里面的都会使用local://这个schema,本地文件则不需要
> > > >
> > > > bin/flink run-application -t kubernetes-application \
> > > > -Dkubernetes.cluster-id=my-flink-cluster \
> > > > -Dkubernetes.pod-template-file=/path/of/pod-template.yaml \
> > > > local:///opt/flink/examples/streaming/StateMachineExample.jar
> > > >
> > > >
> > > >
> > > > 如果还是不明白,看一下这个测试的实现就清楚了[1]
> > > >
> > > > [1].
> > > >
> > >
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/test_kubernetes_application_ha.sh
> > > >
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > > 黄剑文  于2021年12月24日周五 17:57写道:
> > > >
> > > > > client-local的文件,不是镜像里面的。这句话该怎么理解?因为run-application
> > > > >
> > > > >
> > >
> 模式下是需要将用户jar包跟flink标准镜像打到一起形成自己镜像然后进行提交。那么这个文件该放在哪个地方?目前我指定路径发现读的是镜像包中的路径。如/opt/my-pod-template。读的是镜像中/opt/my-pod-template文件。
> > > > >
> > > > > 谢谢您的回复。
> > > > >
> > > > > Yang Wang  于2021年12月24日周五 11:18写道:
> > > > > >
> > > > > > 使用flink
> > > > > >
> > >
> run-application来提交任务时,kubernetes.pod-template-file需要指定的是一个client-local的文件
> > > > > > 不是镜像里面的
> > > > > >
> > > > > > Best,
> > > > > > Yang
> > > > > >
> > > > > > hjw <1010445...@qq.com.invalid> 于2021年12月23日周四 22:21写道:
> > > > > >
> > > > > > > Flink版本:1.13Flink基于Native K8s
> > > > > > >
> > > > >
> > >
> 部署模式下,因为有场景需要,jobmanager和taskmanager需要配置一些特定的hosts,查阅官方文档后发现可以支持自己指定一些pod-Template来指定jm和tm的一些K8s部署行为,但这些pod-Template需要打在提交客户端镜像里。
> > > > > > >
> > > > > > >
> > > > >
> > >
> 问题是jm和tm在不同环境下需要配置的Hosts并不相同。如开发环境,测试环境,生产环境。这意味着不同环境需维护不同的镜像。请问各位在使用上有什么好方法去解决呢。谢谢。
> > > > >
> > >
>


Re: flink sql 如何提高下游并发度?

2022-01-11 文章 Chang Li
可以的,提供一个思路,读取了kafka的数据后,直接输出原生的字节流后,接一层map算子做序列化相关工作,map算子的并发度你可以自己控制,这样kafka拉取就不会是瓶颈,大量的计算工作放到了map中,而map的并发度是可以自己控制的

Jeff  于2022年1月9日周日 19:45写道:

> 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?


回复: flink sql 如何提高下游并发度?

2022-01-11 文章 JasonLee
hi


是 10 目前 source 还不支持单独设置并发度,但是 sink 是支持的,当然如果没有单独设置的话 sink 也是 10


Best
JasonLee


在2022年01月11日 16:52,RS 写道:
Hi,
请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10?
如果source是10的话,那还有7个线程就空闲了?



在 2022-01-11 11:10:41,"Caizhi Weng"  写道:
Hi!

可以设置 parallelism.default 为需要的并发数。

Jeff  于2022年1月9日周日 19:44写道:

当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?


Re: flink sql 如何提高下游并发度?

2022-01-11 文章 chang li
可以的,提供一个思路,读取了kafka的数据后,直接输出原生的字节流后,接一层map算子做序列化相关工作,map算子的并发度你可以自己控制,这样kafka拉取就不会是瓶颈,大量的计算工作放到了map中,而map的并发度是可以自己控制的

Caizhi Weng  于2022年1月11日周二 11:11写道:

> Hi!
>
> 可以设置 parallelism.default 为需要的并发数。
>
> Jeff  于2022年1月9日周日 19:44写道:
>
> > 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?
>


Re:Re: flink sql 如何提高下游并发度?

2022-01-11 文章 RS
Hi,
请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10?
如果source是10的话,那还有7个线程就空闲了?



在 2022-01-11 11:10:41,"Caizhi Weng"  写道:
>Hi!
>
>可以设置 parallelism.default 为需要的并发数。
>
>Jeff  于2022年1月9日周日 19:44写道:
>
>> 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?