来自chenxuying的邮件

2021-06-17 文章 chenxuying

flinksql当一个job的kafka连接信息错误时,会导致整个session集群无法正常发布任务

2021-04-25 文章 chenxuying
环境: flinksql 1.12.2 k8s session模式 描述: 当kafka 端口错误,过一段时间会有如下报错: 2021-04-25 16:49:50 org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition filebeat_json_install_log-3 could be determined 当kafka ip错误,过一段时间会有如下报错: 2021-04-25 20:12:53

请问flink1.11版本如何设置checkpoint的默认保存个数

2020-10-23 文章 chenxuying
我看官方文档[1]应该是设置state.checkpoints.num-retained , 默认是1, 但是设置了没有效果, 官方说默认是1, 但是我发现好像是10 , 同时我也设置了其他的属性,比如 execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION 是可行,所以我的设置应该没有什么问题

Re:Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-16 文章 chenxuying
1使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常 2线程上下文类加载器是什么 不太明白这两点,可以写个代码例子看看吗 在 2020-10-15 19:47:20,"amen...@163.com" 写道: >追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗? >那这种设置env的方式有可能还会造成其他什么问题? > >best, >amenhub > >发件人: amen...@163.com >发送时间: 2020-10-15 19:22 >收件人:

flink1.11.2webui界面中的任务详情页面获取不到任务的received和sent数据详情

2020-10-13 文章 chenxuying
集群是local Standalone模式,任务可以正常的运行, sink是print,能在Stdout看到数据输出, 但是在任务详情页面没有Bytes Received, Records Received, Bytes Sent , Records Sent等实时数据 都是0

flink1.11.2 在k8s上部署,如何启动history server

2020-10-10 文章 chenxuying
flink1.11.2 在k8s上部署,如何启动history server 之前1.10的yaml里面可以加命令,但是1.11的yaml是通过docker-entrypoint.sh 好像没发现这个入口脚本没有对应的history server参数

Re:flinksql注册udtf使用ROW类型做为输出输出时出错

2020-09-30 文章 chenxuying
上面最后说的splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), DataTypes.BIGINT()])需要改成这个地方splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.STRING()])udtf的第三个参数, 好像只要是能跟sink的字段类型对应就能运行, 但是第二个参数并不能跟source字段对应却能运行就感觉有点奇怪 在 2020-09-30 19:07:06,"chenx

flinksql注册udtf使用ROW类型做为输出输出时出错

2020-09-30 文章 chenxuying
版本: pyflink==1.0 apache-flink==1.11.2 代码如下: env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env) t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true') class

flink1.11.2基于官网在k8s上部署是正常的,但是加了volume配置之后报错Read-only file system

2020-09-28 文章 chenxuying
我在使用k8s部署的时候也是按照官网的方式[1],是正常使用的, 然后后面加了volume配置 { ... "spec": { ... "template": { ... "spec": { "volumes": [ ... { "name": "libs-volume", "hostPath": { "path":

Re:Re: flink使用在docker环境中部署出现的两个问题

2020-09-28 文章 chenxuying
>所以你mount进去会被修改掉 > >[1]. >https://github.com/apache/flink-docker/blob/dev-1.11/docker-entrypoint.sh > > >Best, >Yang > >chenxuying 于2020年9月27日周日 下午7:56写道: > >> 根据官网[1]使用docker部署flink,session cluster模式 >> 环境win10+docker+flink1.11.2 >> cmd命令 >>

flink使用在docker环境中部署出现的两个问题

2020-09-27 文章 chenxuying
根据官网[1]使用docker部署flink,session cluster模式 环境win10+docker+flink1.11.2 cmd命令 docker run ^ -d^ --rm ^ --name=jobmanager ^ --hostname=jobmanager ^ --network flink-network ^ --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^ -p 28081:8081 ^ flink:1.11.2-scala_2.11

flinksql接收到字段值格式为2020-09-23T20:58:24+08:00,如何转成TIMESTAMP

2020-09-23 文章 chenxuying
flinksql 版本是1.11.2 source接收到字段是字符串类型的时间 CREATE TABLE sourceTable ( `time` STRING ) WITH( ... ); sink如下 CREATE TABLE sinktable ( `time1` STRING, `time` TIMESTAMP(3) ) WITH ( 'connector' = 'print' ); insert语句,不知道怎么正确修改TO_TIMESTAMP默认的格式 insert into sinktable select

Re:Re: 使用flinksql时 jdbc connector参数不起作用

2020-09-21 文章 chenxuying
好的, 明白 在 2020-09-17 20:29:09,"Jark Wu" 写道: >> sink.buffer-flush.max-rows = '0' 导致每接收一条数据就插入数据库 > >这个应该是个 bug,我建了个 issue:https://issues.apache.org/jira/browse/FLINK-19280 > >Best, >Jark > >On Thu, 17 Sep 2020 at 18:15, chenxuying wrote: > >&g

使用flinksql时 jdbc connector参数不起作用

2020-09-17 文章 chenxuying
环境是flink1.11.2+idea sql: CREATE TABLE sourceTable ( platform STRING ,game_id bigint ) WITH ( ... ); CREATE TABLE sinktable ( platform STRING ,game_id bigint ) WITH ( 'connector' = 'jdbc', 'url' = '', 'table-name' = '', 'driver' = 'com.mysql.jdbc.Driver',

Re:Re: ​请问在flinksql中如何使用聚合函数 LISTAGG

2020-08-12 文章 chenxuying
好的 , 原来是bug , 感谢回答 在 2020-08-12 21:32:40,"Benchao Li" 写道: >看起来是一个已知bug[1],已经修复,但是还没有发布。 > >[1] https://issues.apache.org/jira/browse/FLINK-18862 > >chenxuying 于2020年8月12日周三 下午9:25写道: > >> 版本: >> flinksql 1.11.0 >> 需求: &g

​请问在flinksql中如何使用聚合函数 LISTAGG

2020-08-12 文章 chenxuying
版本: flinksql 1.11.0 需求: 需要实现多行聚合成一行功能 代码如下: environmentSettings = EnvironmentSettings.new_instance().in_streaming_mode().build() t_env = StreamTableEnvironment.create(environment_settings = environmentSettings)

Re:Re: 有界数据中batch和stream的区别

2020-08-04 文章 chenxuying
不知道您是否知道原因 在 2020-08-04 12:11:32,"godfrey he" 写道: >逻辑上批产生的结果是Table,流产生的结果是Changelog。 >你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。 >最简单的方式可以将query改为带group by的,再看结果的差异。 >更多关于Table和Changelog的概念可以参考 [1] > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/z

有界数据中batch和stream的区别

2020-08-03 文章 chenxuying
hi : flink table sql 1.11.0 在EnvironmentSettings中可以设置BatchMode或StreamingMode EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance() //.inStreamingMode() .inBatchMode() .build(); 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 ,

Re:Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-02 文章 chenxuying
cs-master/dev/table/connectors/jdbc.html#idempotent-writes> > > >> 在 2020年8月3日,10:33,chenxuying 写道: >> >> 你好,我这边只是做一个尝试 , 因为之前使用了insert into + 主键做到更新db记录的操作 , 然后现在看到INSERT >> OVERWRITE的语法就试了一下 , 原来OVERWRITE目前只支持 Filesystem connector 和 Hive table >> 然后还想问下在使

Re:Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-02 文章 chenxuying
://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling> > >> 在 2020年8月1日,19:20,chenxuying 写道: >> >> Hello >> 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert >> ove

flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-01 文章 chenxuying
bleSink to implement SupportsOverwrite interface. 是得自定义connector吗,实现DynamicTableSink? 祝好 chenxuying [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax

Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 文章 chenxuying
hi ok,谢谢,懂了哈哈 在 2020-07-31 21:27:02,"Leonard Xu" 写道: >Hello > >> 在 2020年7月31日,21:13,chenxuying 写道: >> >> 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做 > >简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY

Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 文章 chenxuying
谢谢回答 使用新属性可以 成功修改记录 , 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做 在 2020-07-31 16:46:41,"Leonard Xu" 写道: >Hi, chenxuying > >看你还是用的还是 " 'connector.type' = 'jdbc', …. " >,这是老的option,使用老的option参数还是需要根据query推导主键, >需要使用新的属性[1]:&q

flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 文章 chenxuying
hi 我使用的flink 1.11.0版本 代码如下 StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv); tableEnvironment.executeSql(" " + " CREATE TABLE mySource ( " + " a bigint, " + " b bigint " + "

Re:Re: 官方pyflink 例子的执行问题

2020-07-20 文章 chenxuying
>memory了,table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", >'80m')。你可以参考文档上的例子,以及对应的note说明[1] > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#scalar-functions > >Best, >Xingb

官方pyflink 例子的执行问题

2020-07-20 文章 chenxuying
官方例子: https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html 按照例子写了程序,也安装了pyflink | python -m pip install apache-flink | 代码: | from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.descriptors import

flink REST API是否支持-C参数

2020-06-24 文章 chenxuying
目前使用的是flink 1.10.0 背景: REST API有一个提交job的接口 接口 /jars/:jarid/run 参数entryClass,programArgs,parallelism,jobId,allowNonRestoredState,savepointPath 如果使用命令行方式提交job flink run -C file:///usr/local/soft/flink/my-function-0.1.jar -c cn.xuying.flink.table.sql.ParserSqlJob

Re:Re: Re: FlinkSQL 是否支持类似临时中间表的概念

2020-06-24 文章 chenxuying
你好,请问下,my_parse是个udf吧 然后有没有什么操作可以使用udtf解析出多个字段 , 这些字段直接就是source表的字段 , 然后选出时间字段定义 watermark , 类似如下 CREATE TABLE sourceTable( request_uri STRING, (column_1,column_2,heart_time) as udtf_parse(request_uri) )with(..); 哈哈,不知道有没有这样的语法 在 2020-06-24 12:24:46,"Jark Wu" 写道: >你可以在 DDL

Re:Re: Re: flink启动任务的方式

2020-04-21 文章 chenxuying
不是个参数列表么,多传几个。 > >或把依赖提前部署到${FLINK_HOME}/plugins里 > >chenxuying 于2020年4月21日周二 下午3:36写道: > >> 这个是可以 , 不过我们的需求不允许打FatJar >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>

Re:Re: flink启动任务的方式

2020-04-21 文章 chenxuying
这个是可以 , 不过我们的需求不允许打FatJar 在 2020-04-21 15:27:48,"Arnold Zai" 写道: >打个FatJar > >chenxuying 于2020年4月21日周二 下午2:47写道: > >> 请问下目前flink的启动方式有哪些 >> 1 通过命令行来执行 >> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c >> cn.xxx.flink

flink启动任务的方式

2020-04-21 文章 chenxuying
请问下目前flink的启动方式有哪些 1 通过命令行来执行 flink run -C file:///usr/local/soft/flink/function-0.1.jar -c cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar 2通过自带的webui页面上传jar , submit jar 3 通过代码 createRemoteEnvironment 目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api