解析kafka的mysql binlog问题

2020-07-27 文章 air23
你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢? private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + " `data` VARCHAR , " + " `table` VARCHAR " + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'order_source'," + " 'properties.bootstrap.servers' =

Re: 解析kafka的mysql binlog问题

2020-07-27 文章 Jark Wu
抱歉,还是没有看到附件。 如果是文本的话,你可以直接贴到邮件里。 On Mon, 27 Jul 2020 at 19:22, air23 wrote: > 我再上传一次 > > 在2020年07月27日 18:55,Jark Wu 写道: > > Hi, > 你的附件好像没有上传。 > > On Mon, 27 Jul 2020 at 18:17, air23 wrote: > > > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* > > > > private static final

Re:Re: kafka-connect json格式适配问题?

2020-07-27 文章 RS
Hi, 改了下sql,遇到一个新的问题: Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unsupported cast from 'ROW<`EXPR$0` BIGINT NOT NULL, `EXPR$1` STRING, `EXPR$2` TIMESTAMP(3) *ROWTIME*> NOT NULL' to 'ROW<`total_count` BIGINT, `username` STRING, `update_time` TIMESTAMP(6)>'.

Re:回复:解析kafka的mysql binlog问题

2020-07-27 文章 RS
Hi, 附近应该是收不到的,包括图片啥的 只能回复纯文本,贴代码,如果真的需要图片的话,可以上传到其他的网站上,然后给个连接跳转过去 在 2020-07-27 19:21:51,"air23" 写道: 我再上传一次 在2020年07月27日 18:55,Jark Wu 写道: Hi, 你的附件好像没有上传。 On Mon, 27 Jul 2020 at 18:17, air23 wrote: > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* > > private

Re: Blink的Batch模式的并行度问题

2020-07-27 文章 jun su
hi, 如果底层是FileInputFormat ,默认就是1个并行度, 这个参数我尝试了并不起作用, 看代码是创建了一个SingleOutputStreamOperator , 感觉得重写下我使用的OrcInputFormat , 让他不继承FileInputFormat , 像源码里的HiveInputFormat一样 Caizhi Weng 于2020年7月27日周一 下午5:31写道: > Hi, > > 可以配置 table.exec.resource.default-parallelism 为需要的并发。详见文档[1] > > [1] > >

Re: sql-client 的jdbc表出错

2020-07-27 文章 杨荣
在 yarn 上提交 job 可以,不代表通过 sql-client 可以,他们使用的是不同的脚本和配置。前者跟 bin/flink, bin/yarn-session.sh, conf/flink-conf.yaml 有关,后跟 bin/sql-client.sh, conf/sql-client-defaults.yaml 有关。 你可以理一下这个逻辑,或者给出你的相关配置文件,以及 sql-client.sh 启动完整命令。 op <520075...@qq.com> 于2020年7月27日周一 下午5:29写道: > 你好, >

Flink Sql 问题

2020-07-27 文章 air23
你好

flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 文章 wind.fly....@outlook.com
Hi,all: 本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下: Caused by: java.lang.NullPointerException at java.util.Objects.requireNonNull(Objects.java:203) at

回复:解析kafka的mysql binlog问题

2020-07-27 文章 air23
我再上传一次 在2020年07月27日 18:55,Jark Wu 写道: Hi, 你的附件好像没有上传。 On Mon, 27 Jul 2020 at 18:17, air23 wrote: > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* > > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + > " `data` VARCHAR , " + > "

Re: kafka-connect json格式适配问题?

2020-07-27 文章 Jark Wu
Hi, 你需要在 DDL 和 query 上都补上 schema 和 payload: CREATE TABLE print_table \ (`schema` STRING, `payload` ROW) \ WITH (\ 'connector' = 'kafka', \ 'topic' = 'test_out', \ 'properties.bootstrap.servers' = '127.0.0.1:9092', \ 'sink.partitioner' = 'round-robin', \ 'format' = 'json') -- DML 上可以用常量写死

Re: 解析kafka的mysql binlog问题

2020-07-27 文章 Jark Wu
Hi, 你的附件好像没有上传。 On Mon, 27 Jul 2020 at 18:17, air23 wrote: > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* > > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + > " `data` VARCHAR , " + > " `table` VARCHAR " + > ") WITH (" + >

回复: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 文章 wind.fly....@outlook.com
补充一下,执行的sql如下: select order_no, order_time from x.ods.ods_binlog_test_trip_create_t_order_1 发件人: wind.fly@outlook.com 发送时间: 2020年7月27日 18:49 收件人: user-zh@flink.apache.org 主题: flink1.11.0 执行sqlQuery时报NullPointException Hi,all:

Re: kafka-connect json格式适配问题?

2020-07-27 文章 Leonard Xu
> 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗? 窗口里的时间用来做time attribute 列了吧,只能是TIMESTAMP(3), 其TIMESTAMP字段Flink是可以支持到TIMESTAMP(9)的 祝好 Leonard > 在 2020年7月27日,20:05,RS 写道: > > Hi, > 改了下sql,遇到一个新的问题: > Caused by: org.apache.flink.table.planner.codegen.CodeGenException: >

Re:Re: kafka-connect json格式适配问题?

2020-07-27 文章 RS
Hi, 啊,发现不太对,`schema`需要一个dict,不是STRING。请教下这个如何用SQL定义出来? Thanks 在 2020-07-27 17:49:18,"Jark Wu" 写道: >Hi, > >你需要在 DDL 和 query 上都补上 schema 和 payload: > >CREATE TABLE print_table \ >(`schema` STRING, `payload` ROWupdate_time TIMESTAMP(6)>) \ >WITH (\ >'connector' = 'kafka', \ >'topic' = 'test_out',

?????? sql-client ??jdbc??????

2020-07-27 文章 op
---- ??: "user-zh"

Re: Blink Planner构造Remote Env

2020-07-27 文章 jun su
hi Jark, 抱歉这么晚回复邮件, 在flink 1.11.0版本上按照你的方式试验了下, 创建StreamTableEnvironmentImpl的方式参照了StreamTableEnvironmentImpl#create, 只是删除了方法中检查模式的代码 settings.isStreamingMode() , 结果报以下错误: Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at

Re: sql-client 的jdbc表出错

2020-07-27 文章 杨荣
hi, 你能确定你的 class path 下有 mysql-connector-java-5.1.38.jar 依赖吗?请在运行时确认下这一点。 op <520075...@qq.com> 于2020年7月27日周一 下午2:45写道: > 您好,我创建了一个jdbc的表 > > > CREATE TABLE mvp_dim_anticheat_args_all ( > id BIGINT, > dt STRING, > cnt_7d INT, > cnt_30d INT, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( >

Re: Blink Planner构造Remote Env

2020-07-27 文章 jun su
是依赖问题,解决了 jun su 于2020年7月27日周一 下午2:29写道: > hi Jark, > > 抱歉这么晚回复邮件, 在flink 1.11.0版本上按照你的方式试验了下, > 创建StreamTableEnvironmentImpl的方式参照了StreamTableEnvironmentImpl#create, > 只是删除了方法中检查模式的代码 settings.isStreamingMode() , 结果报以下错误: > > Caused by:

Blink的Batch模式的并行度问题

2020-07-27 文章 jun su
hi all, Flink 目前的blink table planner batch mode (读hdfs上的orc文件)只支持StreamTableSource和LookupableTableSource, 但是StreamTableSource的并行度默认应该是1 , 底层是ContinuousFileMonitoringFunction , 那么如何能扩大并行度来优化性能呢? -- Best, Jun Su

Re:flink 聚合 job 重启问题

2020-07-27 文章 RS
伪代码发下看看?看下jdbc sink的配置,是不是支持删除记录,更新的时候旧记录被删除了 在 2020-07-27 11:33:31,"郑斌斌" 写道: >hi all : > > 请教个问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt > from 流表", 将结果写入到mysql 聚合表中(SINK组件为:flink1.11版本JdbcUpsertTableSink)。 >但问题是,每次JOB重启后,之前mysql

Hbase connector????????

2020-07-27 文章 op
habse??family1 INSERT INTO hTable SELECT rowkey, ROW(null,f1q1) FROM T;

Re: 【flink sql】flink sql insert into插入语句的问题

2020-07-27 文章 Caizhi Weng
Hi, Flink 目前的确不支持这个语法... 我已经创建了一个 issue[1],可以在那里跟踪这个 feature 的进展。 [1] https://issues.apache.org/jira/browse/FLINK-18726 于2020年7月27日周一 上午11:36写道: > 测试Flink版本:1.11.0 > > > > Flink sql支持这种语法插入吗,在插入时指定具体的字段顺序和要插入的列 > > Insert into tableName(col1[,col2]) select col1[,col2] > > > > 目前通过测试发现了以下问题 >

Re: sql-client 的jdbc表出错

2020-07-27 文章 Caizhi Weng
Hi, mysql-connector-java-5.1.38.jar 应该已经包含了 com.mysql.jdbc.Driver 才对;Flink 是以什么模式运行的呢?如果是 standalone session,在 Flink 的 lib 下添加 jar 包之后是否重启过 session 集群呢?另外是否所有的 worker 都添加了 jar 包呢?如果能打出完整的错误栈会更好。 op <520075...@qq.com> 于2020年7月27日周一 下午2:45写道: > 您好,我创建了一个jdbc的表 > > > CREATE TABLE

Re:Re: Re: Could not find any factory for identifier 'kafka'

2020-07-27 文章 RS
Hi, 1. 好的,学习了 2. 确实,部分Flink依赖调整为provided,打包测试也可以正常执行,但是flink-walkthrough-common_2.11这种包在Flink的lib中没有看到,还是打包进去了 在 2020-07-27 11:42:50,"Caizhi Weng" 写道: >Hi, > >Flink 的 TableFactory 利用了 Java 的服务发现功能,所以需要这两个文件。需要确认 jar-with-dependencies >是否能把这些资源文件打进去。 > >另外为什么需要把 Flink 的依赖也打在大包里呢?因为 Flink 本身的

kafka-connect json格式适配问题?

2020-07-27 文章 RS
hi, kafka->Flink->kafka->mysql Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。 使用kafka-connect是方便数据同时导出到其他存储 Flink定义输出表结构: CREATE TABLE print_table \ (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \ WITH (\ 'connector' = 'kafka', \ 'topic' = 'test_out',

Re: Blink的Batch模式的并行度问题

2020-07-27 文章 Caizhi Weng
Hi, 可以配置 table.exec.resource.default-parallelism 为需要的并发。详见文档[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism jun su 于2020年7月27日周一 下午3:50写道: > hi all, > > Flink 目前的blink table planner batch mode >

sql-client ??jdbc??????

2020-07-27 文章 op
??jdbc CREATE TABLE mvp_dim_anticheat_args_all ( id BIGINT, dt STRING, cnt_7d INT, cnt_30d INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'driver'='com.mysql.jdbc.Driver', 'url' = 'jdbc:mysql://localhost:3306/huyou_oi', 'table-name' =

Re: Flink 1.11 submit job timed out

2020-07-27 文章 Yang Wang
建议先配置heartbeat.timeout的值大一些,然后把gc log打出来 看看是不是经常发生fullGC,每次持续时间是多长,从你目前提供的log看,进程内JM->RM都会心跳超时 怀疑还是和GC有关的 env.java.opts.jobmanager: -Xloggc:/jobmanager-gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=512M Best, Yang SmileSmile 于2020年7月27日周一 下午1:50写道: > Hi,Yang

?????? sql-client ??jdbc??????

2020-07-27 文章 op
?? yarn??,sql-client??1.11.0 ---- ??: "user-zh"

回复:flink row 类型

2020-07-27 文章 kcz
哇 这个方式很取巧了 好机智 我之前就是一直索引取值 学习一下 -- 原始邮件 -- 发件人: Jark Wu

kafka avro格式sink报错,NoClassDefFoundError: Could not initialize class org.apache.avro.SchemaBuilder

2020-07-27 文章 RS
Hi, 版本:Flink-1.11.1 任务启动模式:standalone Flink任务编译的jar的maven中包含了flink-avro,jar-with-dependencies编译的 org.apache.flink flink-avro 1.11.1 编译出来的jar也包含了这个class 我看官网上说明 Flink has extensive built-in support for Apache Avro。感觉默认是支持avro的 1. 直接启动的话,会报错

Re: flink1.11.1启动问题

2020-07-27 文章 Xintong Song
建议确认一下 Yarn 的配置 “yarn.scheduler.minimum-allocation-mb” 在 Yarn RM 和 Flink JM 这两台机器上是否一致。 Yarn 会对 container request 做归一化。例如你请求的 TM container 是 1728m (taskmanager.memory.process.size) ,如果 minimum-allocation-mb 是 1024m,那么实际得到的 container 大小必须是 minimum-allocation-mb 的整数倍,也就是 2048m。Flink 会去获取 Yarn

flink1.11.1使用Table API Hive方言的executSql报错

2020-07-27 文章 shimin huang
Hi,all: 本人基于Flink1.11.1的table API使用Hive方言,调用executSql方法后报错,堆栈信息如下: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql at org.apache.flink.client.program.PackagedProgram.callMainMethod( PackagedProgram.java:302)

Re:Re: 解析kafka的mysql binlog问题

2020-07-27 文章 air23
你好 测试代码如下 private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + " `data` VARCHAR , " + " `table` VARCHAR " + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'source_databases'," + " 'properties.bootstrap.servers' = '***'," + " 'properties.group.id' = 'real1'," + " 'format'

Re: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 文章 godfrey he
hi 能给出详细的schema信息吗? wind.fly@outlook.com 于2020年7月27日周一 下午7:02写道: > 补充一下,执行的sql如下: > > select order_no, order_time from > x.ods.ods_binlog_test_trip_create_t_order_1 > > > 发件人: wind.fly@outlook.com > 发送时间: 2020年7月27日 18:49 > 收件人: user-zh@flink.apache.org

回复: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 文章 wind.fly....@outlook.com
schema信息如下: CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 ( _type STRING, order_no STRING, order_time STRING, dt as TO_TIMESTAMP(order_time), proctime as PROCTIME(), WATERMARK FOR dt AS dt - INTERVAL '5' SECOND ) WITH ( 'connector.type' = 'kafka',

Re: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 文章 godfrey he
你们是否在多线程环境下使用 TableEnvironment ? TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。 godfrey he 于2020年7月28日周二 上午9:55写道: > hi 能给出详细的schema信息吗? > > wind.fly@outlook.com 于2020年7月27日周一 > 下午7:02写道: > >> 补充一下,执行的sql如下: >> >> select order_no, order_time from >>

回复: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 文章 wind.fly....@outlook.com
不是多线程同时操作一个tableEnvironment,每执行一次都会创建一个TableEnvironment 发件人: godfrey he 发送时间: 2020年7月28日 9:58 收件人: user-zh 主题: Re: flink1.11.0 执行sqlQuery时报NullPointException 你们是否在多线程环境下使用 TableEnvironment ? TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。 godfrey he

Flink-1.10 on yarn Taskmanager启动参数问题

2020-07-27 文章 Roc Marshal
Hi, all. 请问Flink-1.10 on yarn Taskmanager启动的jvm GC 回收器参数默认信息是G1吗? 基本集群环境:hadoop-2.7.5、flink-1.10、jdk-1.8_61,其中jvm相关参数均未进行显示设置。 谢谢。 Best, Roc Marshal.

Re: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 文章 godfrey he
我的怀疑点还是多线程引起的。 你能具体描述一下你们gateway的行为吗? 是一个web server? 另外,你可以在table env执行query前加上 RelMetadataQueryBase.THREAD_PROVIDERS .set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE())); 这句话临时fix。 wind.fly@outlook.com 于2020年7月28日周二 上午11:02写道: >

Re: flink1.11.1使用Table API Hive方言的executSql报错

2020-07-27 文章 godfrey he
你的包是完整的flink-1.11.1的包吗? 例如 check一下 ClusterClientJobClientAdapter 这个类是否继承 CoordinationRequestGateway ? shimin huang 于2020年7月28日周二 上午11:21写道: > Hi,all: > 本人基于Flink1.11.1的table API使用Hive方言,调用executSql方法后报错,堆栈信息如下: > org.apache.flink.client.program.ProgramInvocationException: The main method >