Re: flink row 类型

2020-07-23 文章 Jingsong Li
可以看下Flink 1.11的UDF type inference. 在TypeInference中有input的type,这个type应该是包含字段信息的。 Best, Jingsong On Thu, Jul 23, 2020 at 2:09 PM Dream-底限 wrote: > hi > 是的,我们的数据场景比较尴尬,那我想其他方式实现一下 > > Benchao Li 于2020年7月23日周四 下午12:55写道: > > > 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。 > >

Re: Is it possible to do state migration with checkpoints?

2020-07-23 文章 Sivaprasanna
+user-zh@flink.apache.org A follow up question. I tried taking a savepoint but the job failed immediately. It happens everytime I take a savepoint. The job is running on a Yarn cluster so it fails with "container running out of memory". The state size averages around 1.2G but also peaks to ~4.5

Re: flink row 类型

2020-07-23 文章 Dream-底限
hi 是的,我们的数据场景比较尴尬,那我想其他方式实现一下 Benchao Li 于2020年7月23日周四 下午12:55写道: > 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。 > 你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。 > > Dream-底限 于2020年7月22日周三 下午7:22写道: > > > hi、 > > 我这面定义row数据,类型为ROW,可以通过 > >

Re: flink row 类型

2020-07-23 文章 xiao cai
可以考虑把字段索引值保存下来再获取 原始邮件 发件人: Dream-底限 收件人: user-zh 发送时间: 2020年7月23日(周四) 14:08 主题: Re: flink row 类型 hi 是的,我们的数据场景比较尴尬,那我想其他方式实现一下 Benchao Li 于2020年7月23日周四 下午12:55写道: > 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。 > 你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。 > > Dream-底限 于2020年7月22日周三

Re: flink row 类型

2020-07-23 文章 Dream-底限
hi、Jingsong Li 我查看了对应的api,并运行了demo测试,通过CallContext我可以拿到对应的字段类型,但是无法拿到对应的字段名称 >>在TypeInference中有input的type,这个type应该是包含字段信息的。 xiao cai 于2020年7月23日周四 下午2:19写道: > 可以考虑把字段索引值保存下来再获取 > > > 原始邮件 > 发件人: Dream-底限 > 收件人: user-zh > 发送时间: 2020年7月23日(周四) 14:08 > 主题: Re: flink row 类型 > > > hi

Re: flink1.11 tablefunction

2020-07-23 文章 Dream-底限
hi 这貌似确实是一个bug,先用子查询打开后程序就可以运行正常了 Benchao Li 于2020年7月23日周四 下午12:52写道: > 现在有一个work around,就是你可以用子查询先把row展开,比如: > select ... > from ( > select data.rule_results as rule_results, ... > ) cross join unnest(rule_results) as t(...) > > Benchao Li 于2020年7月23日周四 下午12:44写道: > > >

application模式提交操作hive的任务相关疑问

2020-07-23 文章 Jun Zhang
大家好: 我现在有一个flink的程序要读写hive的数据,在程序中构造HiveCatalog的时候需要有一个hiveConfDir,如果我使用的是新的application模式去提交任务,这个任务的解析应该是放到了master端,这个时候hadoop集群上没有hive的相关配置,那么这个hiveConfDir该怎么配置呢? 谢谢

flink-1.11 在 windows 下怎样启动

2020-07-23 文章 wangl...@geekplus.com.cn
我看 flink-1.11 发布包 bin 目录没有 windows 启动所需的 .bat 文件了。 那在 windows 下怎样启动呢? 谢谢 王磊 wangl...@geekplus.com.cn

metrics influxdb reporter 不支持https及jar放置路径问题

2020-07-23 文章 zz zhang
hello,目前Flink1.11.1 发布的org.apache.flink.metrics.influxdb.InfluxdbReporter默认是上报是http协议,并不支持https协议,源码参考[2] 另外,文档[1]标注的需要将 /opt/flink-metrics-influxdb-1.11.0.jar复制到目录plugins/influxdb,经过测试应该是要复制到目录plugins/metrics-influx [1]

Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-23 文章 WeiXubin
Hi, 我想请问下使用 streamExecutionEnv.execute("from kafka sink hbase"),通过这种方式可以给Job指定名称。 而当使用streamTableEnv.executeSql(sql)之后似乎无法给Job定义名称。 请问有什么解决方案吗?谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

2020-07-23 文章 Zhou Zach
Hi all, 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position, 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊 老参数: streamTableEnv.executeSql( """ | |CREATE TABLE kafka_table ( |uid

Re: Flink 1.11 submit job timed out

2020-07-23 文章 SmileSmile
Hi Yang Wang 先分享下我这边的环境版本 kubernetes:1.17.4. CNI: weave 1 2 3 是我的一些疑惑 4 是JM日志 1. 去掉taskmanager-query-state-service.yaml后确实不行 nslookup kubectl exec -it busybox2 -- /bin/sh / # nslookup 10.47.96.2 Server: 10.96.0.10 Address: 10.96.0.10:53 ** server can't find

Re: flink row 类型

2020-07-23 文章 Dream-底限
hi xiao cai 我懂你的意思了,这确实是一种解决方式,不过这种方式有一个弊端就是每个这种功能都要开发对应的方法,我还是比较倾向于一个方法适用于一类场景,如果做不到只能每次有需求都重新开发了 xiao cai 于2020年7月23日周四 下午3:40写道: > Hi ,Dream > > > 比如你最终拿到的是Row(10),10表示有10个字段,这些字段的顺序是固定的,那么你可以把每个字段在row里的索引的映射关系保存下来,如下 > map ,然后 row.getField(map.get(fieldName))获取你需要的值 > > > 原始邮件 > 发件人:

Re: (无主题)

2020-07-23 文章 shizk233
恭喜! 罗显宴 <15927482...@163.com> 于2020年7月23日周四 上午1:14写道: > 感谢shizk233大佬,我这个问题终于得到解决,我主要是通过全窗口加mapstate实现的 > best > shizk233 > > > | | > 罗显宴 > | > | > 邮箱:15927482...@163.com > | > > 签名由 网易邮箱大师 定制 > > 在2020年07月21日 15:04,罗显宴 写道: >

Re: flink row 类型

2020-07-23 文章 xiao cai
Hi ,Dream 比如你最终拿到的是Row(10),10表示有10个字段,这些字段的顺序是固定的,那么你可以把每个字段在row里的索引的映射关系保存下来,如下 map ,然后 row.getField(map.get(fieldName))获取你需要的值 原始邮件 发件人: Dream-底限 收件人: user-zh 发送时间: 2020年7月23日(周四) 14:57 主题: Re: flink row 类型 hi、xiao cai 可以说一下思路吗,我没太懂 》》可以考虑把字段索引值保存下来再获取 Dream-底限 于2020年7月23日周四 下午2:56写道:

flink????kafka????????????????????

2020-07-23 文章 ??????
??flinkflinkkafka

Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-23 文章 godfrey he
这个问题的已经有一个issue:https://issues.apache.org/jira/browse/FLINK-18545,请关注 WeiXubin <18925434...@163.com> 于2020年7月23日周四 下午6:00写道: > Hi, > 我想请问下使用 streamExecutionEnv.execute("from kafka sink > hbase"),通过这种方式可以给Job指定名称。 > 而当使用streamTableEnv.executeSql(sql)之后似乎无法给Job定义名称。 > 请问有什么解决方案吗?谢谢 > > > > -- >

Re:Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-23 文章 Weixubin
Hi, 我想请教下,使用streamExecutionEnv.execute("from kafka sink hbase") 是可以指定Job的名称。 而当改用streamTableEnv.executeSql(sql)的方式时,似乎无法定义Job的名称。 请问有什么解决的方法吗? 在 2020-07-08 16:07:17,"Jingsong Li" 写道: >Hi, > >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。 > >所以你后面

Re: flink 1.11 executeSql查询kafka表print没有输出

2020-07-23 文章 godfrey he
client端会不断的pull sink产生的数据,但是只有等checkpoint完成后,其对应的数据才能 collect() 和 print() 返回。 这是为了保证exactly once语义。 在1.12里,同时支持了at least once 和 exactly once 语义。默认情况下是 at least once,collect() 和 print() 的结果可能有重复。 如果有兴趣可以参考pr:https://github.com/apache/flink/pull/12867

?????? flink 1.11 ddl sql ????PROCTIME()????????csv????

2020-07-23 文章 Asahi Lee
filesystemcsv?? filesystem ---- ??:

Re: flink row 类型

2020-07-23 文章 Dream-底限
hi、xiao cai 可以说一下思路吗,我没太懂 》》可以考虑把字段索引值保存下来再获取 Dream-底限 于2020年7月23日周四 下午2:56写道: > hi、Jingsong Li > 我查看了对应的api,并运行了demo测试,通过CallContext我可以拿到对应的字段类型,但是无法拿到对应的字段名称 > > >>在TypeInference中有input的type,这个type应该是包含字段信息的。 > > xiao cai 于2020年7月23日周四 下午2:19写道: > >> 可以考虑把字段索引值保存下来再获取 >> >> >> 原始邮件 >> 发件人:

flink 1.11 ddl 写mysql的问题

2020-07-23 文章 曹武
我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛 代码如下: String sourceDdl =" CREATE TABLE debezium_source " + "( " + "id STRING NOT NULL, name STRING, description STRING, weight Double" + ") " + "WITH (" +

flink 1.11 executeSql查询kafka表print没有输出

2020-07-23 文章 wind.fly....@outlook.com
Hi, all: 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的), sql如下: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings =

回复:application模式提交操作hive的任务相关疑问

2020-07-23 文章 Jun Zhang
我现在是改了源码,是把hivecatalog里面接收HiveConf参数的protected类型的构造方法改成了public类型,然后自己在代码里构造了HiveConf对象,传了一些必要的参数,比如metastore地址等。 BestJun -- 原始邮件 -- 发件人: Rui Li

Re: flink 1.11 executeSql查询kafka表print没有输出

2020-07-23 文章 godfrey he
1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下, 都是exactly once语义,需要配置checkpoint才能得到结果。 Best, Godfrey wind.fly@outlook.com 于2020年7月23日周四 下午7:22写道: > Hi, all: > > 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的), > sql如下: > > >

回复: flink 1.11 executeSql查询kafka表print没有输出

2020-07-23 文章 wind.fly....@outlook.com
Hi,Godfrey: 加了checkpoint后确实可以了,能具体讲一下原理吗?print是在完成快照的时候顺便把结果输出了吗?或者有没有相关文档? Best, Junbao Zhang 发件人: godfrey he 发送时间: 2020年7月23日 19:24 收件人: user-zh 主题: Re: flink 1.11 executeSql查询kafka表print没有输出 1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下,

Re: application模式提交操作hive的任务相关疑问

2020-07-23 文章 Rui Li
有一种做法是把hive-site.xml打到作业jar包里,然后程序运行的时候再拷出来放到一个本地目录...我们也可以考虑以后为HiveCatalog添加一个接受HiveConf参数的构造器,这样对API的模式应该会更灵活 On Thu, Jul 23, 2020 at 3:51 PM Jun Zhang wrote: > 大家好: > >

Re: flink 1.11 ddl 写mysql的问题

2020-07-23 文章 godfrey he
你观察到有sink写不过来导致反压吗? 或者你调大flush interval试试,让每个buffer攒更多的数据 曹武 <14701319...@163.com> 于2020年7月23日周四 下午4:48写道: > 我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛 > 代码如下: > String sourceDdl =" CREATE TABLE debezium_source " + > "( " + > "id

Re:Re: flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

2020-07-23 文章 Zhou Zach
当前作业有个sink connector消费不到数据,我找到原因了,根本原因是kafka中时间字段的问题,只是with子句新旧参数对相同的字段数据表现了不同的行为,kafka中的消息格式: {"uid":46,"sex":"female","age":11,"created_time":"2020-07-23T19:53:15.509Z"} 奇怪的是,在kafka_table DDL中,created_time 定义为TIMESTAMP(3),with使用老参数是可以成功运行的,with使用新参数,在IDEA中运行没有任何异常,提交到yarn上,会报异常:

自定义metrics reporter 如何不通过flink conf来注册并生效

2020-07-23 文章 Fisher Xiang
Hi all, 请问实现了 MetricReporter, CharacterFilter,Scheduled, Reporter 的自定义 customerReporter 如何*能在 代码env里面注册并实现metric上报*,要求不在flink conf.xml 文件里面配置 该customerReporter的信息? 需求:在自定义的source 和sink等算子里面计算处理成功,失败的数据条数并通过自定义reporter上报,并且该reporter需要是通用型的即 *适用于多个flink 任务*从而避开重复造轮子。 thx BR Fisher

Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-23 文章 WeiXubin
感谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: application模式提交操作hive的任务相关疑问

2020-07-23 文章 Yang Wang
可以使用-Dyarn.ship-directories=/path/of/hiveConfDir把hive的配置ship到JobManager端,hiveConfDir默认会在 当前目录下,同时这个目录也会自动加入到classpath,不太清楚这样是否可以让hive正常加载到 Best, Yang Jun Zhang 于2020年7月23日周四 下午3:51写道: > 大家好: > >

Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误

2020-07-23 文章 Leonard Xu
Hi, Filesystem connector 支持streaming 写入,streaming 读取 还未支持,所以读取完了就停止。支持streaming 写入从文档上看[1]应该是有计划的 [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html

Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误

2020-07-23 文章 godfrey he
和hive结合下,filesystem是支持流式读取的,可以参考 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/hive/hive_streaming.html#streaming-reading Leonard Xu 于2020年7月23日周四 下午10:28写道: > Hi, > > Filesystem connector 支持streaming 写入,streaming 读取 > 还未支持,所以读取完了就停止。支持streaming

Flink??broadcast

2020-07-23 文章 ????????
Hi,all: flinkbroadcastStream. ??hive??map join??. .

Re: Is it possible to do state migration with checkpoints?

2020-07-23 文章 Sivaprasanna
Adding dev@ to get some traction. Any help would be greatly appreciated. Thanks. On Thu, Jul 23, 2020 at 11:48 AM Sivaprasanna wrote: > +user-zh@flink.apache.org > > A follow up question. I tried taking a savepoint but the job failed > immediately. It happens everytime I take a savepoint. The

Re: flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

2020-07-23 文章 Leonard Xu
Hi 你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗? 正常应该不会的,可以提供个可复现代码吗? 祝好 Leonard Xu > 在 2020年7月23日,18:13,Zhou Zach 写道: > > Hi all, > > 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position, >

Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-23 文章 godfrey he
hi, 目前没有解决办法,insert job根据sink表名自动生成job name。 后续解法关注 https://issues.apache.org/jira/browse/FLINK-18545 Weixubin <18925434...@163.com> 于2020年7月23日周四 下午6:07写道: > Hi, > 我想请教下,使用streamExecutionEnv.execute("from kafka sink hbase") 是可以指定Job的名称。 > 而当改用streamTableEnv.executeSql(sql)的方式时,似乎无法定义Job的名称。 >

Re: Re: flink sqlUpdate,如何获取里面的字段,字段类型,with 等3个属性

2020-07-23 文章 godfrey he
1.10 也是支持的 Michael Ran 于2020年7月22日周三 下午9:07写道: > 1.tableEvn.from(xx).getSchema() 我确实通过这个拿到了schema,2.with > properties属性很重要 ,关系我自定义的一些参数设定。3.关于 catalog 这个东西,是不是只有1.11 > 版本才能从catalog 获取 with properties 哦? 1.10 you 有支持吗 > 在 2020-07-22 18:22:22,"godfrey he" 写道: > >tableEnv 中 可以通过 >

Re: flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

2020-07-23 文章 Leonard Xu
Hi 这是1.11里的一个 json format t的不兼容改动[1],目的是支持更多的 timestamp format 的解析,你可以把json-timestamp-format-standard 设置成 “ISO-8601”,应该就不用改动了。 Best Leonard Xu [1]

Re:Re: flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

2020-07-23 文章 Zhou Zach
Hi, 按照提示修改了,还是报错的: Query: val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamExecutionEnv.setStateBackend(new

??????flink1.11????????

2020-07-23 文章 Cayden chen
?? logback??appder( flink

Re: flink1.11日志上报

2020-07-23 文章 Dream-底限
hi Cayden chen、 也就是说你们日志上报的实现方式是实现自定义appder来实现是吧,这确实是一个不错的方式; 我先前看spark可以实现对应的listener用来实现日志上报,查看了一下flink api貌似也有对应listen,具体是实现哪一个还不知道,现在我们还处在一个功能整理阶段 Cayden chen <1193216...@qq.com> 于2020年7月24日周五 上午10:53写道: > 我们的获取逻辑是通过自定义 logback的appder( flink >

Re: flink1.11日志上报

2020-07-23 文章 zilong xiao
这个可以用配置文件实现,利用kafka appender将日志打到kafka中,然后自己去消费kafka处理即可,1.11中支持log4j2了,建议使用log4j2 Dream-底限 于2020年7月24日周五 上午10:50写道: > hi、 > > 我这面想实现一个日志上报的功能,就是flink任务启动后,让flink主动将当前任务日志打到外部存储系统,想问一下flink有对应的接口吗,具体要实现哪一个类哪 >

Re: flink解析kafka json数据

2020-07-23 文章 Jark Wu
哈哈,看来是一个很通用的需求啊。 本超同学已经在1.12 中支持了这个功能了, see https://issues.apache.org/jira/browse/FLINK-18002 Best, Jark On Fri, 24 Jul 2020 at 11:36, Dream-底限 wrote: > hi jark wu、 > 将解析错误数据直接打到日志里确实是比较通用的解决方案; > 我现在使用flink sql对接kafka > json数据的时候,发现对json数据的解析有一些局限性,即比如我有一条数据是jsonobject,但是我在定义flink sql >

Re: application模式提交操作hive的任务相关疑问

2020-07-23 文章 Jun Zhang
hi,Yang Wang: *谢谢你的建议,稍后我测试一下。* Yang Wang 于2020年7月24日周五 上午10:09写道: > > 可以使用-Dyarn.ship-directories=/path/of/hiveConfDir把hive的配置ship到JobManager端,hiveConfDir默认会在 > 当前目录下,同时这个目录也会自动加入到classpath,不太清楚这样是否可以让hive正常加载到 > > > Best, > Yang > > Jun Zhang 于2020年7月23日周四 下午3:51写道: > > > 大家好: > > > > >

flink1.11日志上报

2020-07-23 文章 Dream-底限
hi、 我这面想实现一个日志上报的功能,就是flink任务启动后,让flink主动将当前任务日志打到外部存储系统,想问一下flink有对应的接口吗,具体要实现哪一个类哪

Re: flink解析kafka json数据

2020-07-23 文章 Dream-底限
hi jark wu、 将解析错误数据直接打到日志里确实是比较通用的解决方案; 我现在使用flink sql对接kafka json数据的时候,发现对json数据的解析有一些局限性,即比如我有一条数据是jsonobject,但是我在定义flink sql connector数据类型的时候如果直接定义为string,会导致数据解析失败(当然,这个失败是正常的) 但是这会有一个局限性就是我没办法以一个string方式获取一个jsonobject数据(由于一些比较尴尬的原因就想以string方式获取jsonobject数据),查看代码发现这是jackson导致的获取失败,这个社区考虑兼容一下吗?

Re: flink row 类型

2020-07-23 文章 Jark Wu
你可以看看是不是可以把这个字段声明成 MAP,这样就可以 map['rule_key1'] 的方式通过字段名去获取了。 Best, Jark On Thu, 23 Jul 2020 at 15:47, Dream-底限 wrote: > hi xiao cai > > 我懂你的意思了,这确实是一种解决方式,不过这种方式有一个弊端就是每个这种功能都要开发对应的方法,我还是比较倾向于一个方法适用于一类场景,如果做不到只能每次有需求都重新开发了 > > xiao cai 于2020年7月23日周四 下午3:40写道: > > > Hi ,Dream > > > > > >

Re: flink-1.11 在 windows 下怎样启动

2020-07-23 文章 Yang Wang
社区已经不再维护windows版本的脚本了,建议你可以使用docker的方式[1]来运行 这样会更方便一些 [1]. https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/docker.html Best, Yang wangl...@geekplus.com.cn 于2020年7月23日周四 下午4:07写道: > > 我看 flink-1.11 发布包 bin 目录没有 windows 启动所需的 .bat 文件了。 > 那在 windows 下怎样启动呢? > > 谢谢 > 王磊 >

关于 sql-client

2020-07-23 文章 杨荣
Hi all, 请问: 1. 在 Embedded mode 下,支持 ClusterClient 进行 job 提交作业,进行分布式计算吗?在文档中没看到,跟着文档走,只启起了 Local 在本地作业,无法运用到生产环境。 2. GateWay mode 预计在那个版本 release?

回复:flink1.11日志上报

2020-07-23 文章 Cayden chen
应该没有api,官网推荐的也是log appder这种方式。用这种方式采集的日志是比较全的 ---原始邮件--- 发件人: "Dream-底限"

Re: 关于 sql-client

2020-07-23 文章 Harold.Miao
1 应该是可以的 主要是你要在flink-conf.yaml里面配置正确的 jobmanager.rpc.address 源码里面有加载主配置文件的逻辑 public LocalExecutor(URL defaultEnv, List jars, List libraries) { // discover configuration final String flinkConfigDir; try { // find the configuration directory flinkConfigDir =

Re: flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

2020-07-23 文章 Leonard Xu
Hi "2020-07-23T19:53:15.509Z” 是 RFC-3339 格式,这个格式是带zone的时间格式,对应的数据类型是 timestamp with local zone,这个应该在1.12里支持了[1] 1.10版本虽然是支持 RFC-3339 格式,但默认解析时区是有问题的,所以在1.11和1.12逐步中纠正了。 在1.11版本中,如果json数据是RFC-3339格式,你可以把这个字段当成string读出来,在计算列中用个UDF自己解析到需要的timestamp。 Best Leonard Xu [1]

回复:flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

2020-07-23 文章 Zhou Zach
Hi, 感谢详细答疑! | | Zhou Zach | | 邮箱:wander...@163.com | 签名由 网易邮箱大师 定制 在2020年07月24日 11:48,Leonard Xu 写道: Hi "2020-07-23T19:53:15.509Z” 是 RFC-3339 格式,这个格式是带zone的时间格式,对应的数据类型是 timestamp with local zone,这个应该在1.12里支持了[1] 1.10版本虽然是支持 RFC-3339 格式,但默认解析时区是有问题的,所以在1.11和1.12逐步中纠正了。

Re: flink 1.11 ddl 写mysql的问题

2020-07-23 文章 Jark Wu
kafka 数据源生产数据的速率是多少呢? 会不会数据源就是每秒100条数据呢。。。? Btw, 查看反压状态是一个比较好的排查方式。 On Thu, 23 Jul 2020 at 20:25, godfrey he wrote: > 你观察到有sink写不过来导致反压吗? > 或者你调大flush interval试试,让每个buffer攒更多的数据 > > 曹武 <14701319...@163.com> 于2020年7月23日周四 下午4:48写道: > > > 我使用fink 1.11.1