你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?
private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
" `data` VARCHAR , " +
" `table` VARCHAR " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'order_source'," +
" 'properties.bootstrap.servers' =
抱歉,还是没有看到附件。
如果是文本的话,你可以直接贴到邮件里。
On Mon, 27 Jul 2020 at 19:22, air23 wrote:
> 我再上传一次
>
> 在2020年07月27日 18:55,Jark Wu 写道:
>
> Hi,
> 你的附件好像没有上传。
>
> On Mon, 27 Jul 2020 at 18:17, air23 wrote:
>
> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
> >
> > private static final
Hi,
改了下sql,遇到一个新的问题:
Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unsupported
cast from 'ROW<`EXPR$0` BIGINT NOT NULL, `EXPR$1` STRING, `EXPR$2` TIMESTAMP(3)
*ROWTIME*> NOT NULL' to 'ROW<`total_count` BIGINT, `username` STRING,
`update_time` TIMESTAMP(6)>'.
Hi,
附近应该是收不到的,包括图片啥的
只能回复纯文本,贴代码,如果真的需要图片的话,可以上传到其他的网站上,然后给个连接跳转过去
在 2020-07-27 19:21:51,"air23" 写道:
我再上传一次
在2020年07月27日 18:55,Jark Wu 写道:
Hi,
你的附件好像没有上传。
On Mon, 27 Jul 2020 at 18:17, air23 wrote:
> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private
hi,
如果底层是FileInputFormat ,默认就是1个并行度, 这个参数我尝试了并不起作用,
看代码是创建了一个SingleOutputStreamOperator , 感觉得重写下我使用的OrcInputFormat ,
让他不继承FileInputFormat , 像源码里的HiveInputFormat一样
Caizhi Weng 于2020年7月27日周一 下午5:31写道:
> Hi,
>
> 可以配置 table.exec.resource.default-parallelism 为需要的并发。详见文档[1]
>
> [1]
>
>
在 yarn 上提交 job 可以,不代表通过 sql-client 可以,他们使用的是不同的脚本和配置。前者跟 bin/flink,
bin/yarn-session.sh, conf/flink-conf.yaml 有关,后跟 bin/sql-client.sh,
conf/sql-client-defaults.yaml 有关。
你可以理一下这个逻辑,或者给出你的相关配置文件,以及 sql-client.sh 启动完整命令。
op <520075...@qq.com> 于2020年7月27日周一 下午5:29写道:
> 你好,
>
你好
Hi,all:
本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
Caused by: java.lang.NullPointerException
at java.util.Objects.requireNonNull(Objects.java:203)
at
我再上传一次
在2020年07月27日 18:55,Jark Wu 写道:
Hi,
你的附件好像没有上传。
On Mon, 27 Jul 2020 at 18:17, air23 wrote:
> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> "
Hi,
你需要在 DDL 和 query 上都补上 schema 和 payload:
CREATE TABLE print_table \
(`schema` STRING, `payload` ROW) \
WITH (\
'connector' = 'kafka', \
'topic' = 'test_out', \
'properties.bootstrap.servers' = '127.0.0.1:9092', \
'sink.partitioner' = 'round-robin', \
'format' = 'json')
-- DML 上可以用常量写死
Hi,
你的附件好像没有上传。
On Mon, 27 Jul 2020 at 18:17, air23 wrote:
> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
>
补充一下,执行的sql如下:
select order_no, order_time from x.ods.ods_binlog_test_trip_create_t_order_1
发件人: wind.fly@outlook.com
发送时间: 2020年7月27日 18:49
收件人: user-zh@flink.apache.org
主题: flink1.11.0 执行sqlQuery时报NullPointException
Hi,all:
> 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗?
窗口里的时间用来做time attribute 列了吧,只能是TIMESTAMP(3),
其TIMESTAMP字段Flink是可以支持到TIMESTAMP(9)的
祝好
Leonard
> 在 2020年7月27日,20:05,RS 写道:
>
> Hi,
> 改了下sql,遇到一个新的问题:
> Caused by: org.apache.flink.table.planner.codegen.CodeGenException:
>
Hi,
啊,发现不太对,`schema`需要一个dict,不是STRING。请教下这个如何用SQL定义出来?
Thanks
在 2020-07-27 17:49:18,"Jark Wu" 写道:
>Hi,
>
>你需要在 DDL 和 query 上都补上 schema 和 payload:
>
>CREATE TABLE print_table \
>(`schema` STRING, `payload` ROWupdate_time TIMESTAMP(6)>) \
>WITH (\
>'connector' = 'kafka', \
>'topic' = 'test_out',
----
??:
"user-zh"
hi Jark,
抱歉这么晚回复邮件, 在flink 1.11.0版本上按照你的方式试验了下,
创建StreamTableEnvironmentImpl的方式参照了StreamTableEnvironmentImpl#create,
只是删除了方法中检查模式的代码 settings.isStreamingMode() , 结果报以下错误:
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot instantiate user function.
at
hi,
你能确定你的 class path 下有 mysql-connector-java-5.1.38.jar 依赖吗?请在运行时确认下这一点。
op <520075...@qq.com> 于2020年7月27日周一 下午2:45写道:
> 您好,我创建了一个jdbc的表
>
>
> CREATE TABLE mvp_dim_anticheat_args_all (
> id BIGINT,
> dt STRING,
> cnt_7d INT,
> cnt_30d INT,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>
是依赖问题,解决了
jun su 于2020年7月27日周一 下午2:29写道:
> hi Jark,
>
> 抱歉这么晚回复邮件, 在flink 1.11.0版本上按照你的方式试验了下,
> 创建StreamTableEnvironmentImpl的方式参照了StreamTableEnvironmentImpl#create,
> 只是删除了方法中检查模式的代码 settings.isStreamingMode() , 结果报以下错误:
>
> Caused by:
hi all,
Flink 目前的blink table planner batch mode
(读hdfs上的orc文件)只支持StreamTableSource和LookupableTableSource,
但是StreamTableSource的并行度默认应该是1 , 底层是ContinuousFileMonitoringFunction ,
那么如何能扩大并行度来优化性能呢?
--
Best,
Jun Su
伪代码发下看看?看下jdbc sink的配置,是不是支持删除记录,更新的时候旧记录被删除了
在 2020-07-27 11:33:31,"郑斌斌" 写道:
>hi all :
>
> 请教个问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt
> from 流表", 将结果写入到mysql 聚合表中(SINK组件为:flink1.11版本JdbcUpsertTableSink)。
>但问题是,每次JOB重启后,之前mysql
habse??family1
INSERT INTO hTable SELECT rowkey, ROW(null,f1q1) FROM T;
Hi,
Flink 目前的确不支持这个语法... 我已经创建了一个 issue[1],可以在那里跟踪这个 feature 的进展。
[1] https://issues.apache.org/jira/browse/FLINK-18726
于2020年7月27日周一 上午11:36写道:
> 测试Flink版本:1.11.0
>
>
>
> Flink sql支持这种语法插入吗,在插入时指定具体的字段顺序和要插入的列
>
> Insert into tableName(col1[,col2]) select col1[,col2]
>
>
>
> 目前通过测试发现了以下问题
>
Hi,
mysql-connector-java-5.1.38.jar 应该已经包含了 com.mysql.jdbc.Driver 才对;Flink
是以什么模式运行的呢?如果是 standalone session,在 Flink 的 lib 下添加 jar 包之后是否重启过 session
集群呢?另外是否所有的 worker 都添加了 jar 包呢?如果能打出完整的错误栈会更好。
op <520075...@qq.com> 于2020年7月27日周一 下午2:45写道:
> 您好,我创建了一个jdbc的表
>
>
> CREATE TABLE
Hi,
1. 好的,学习了
2.
确实,部分Flink依赖调整为provided,打包测试也可以正常执行,但是flink-walkthrough-common_2.11这种包在Flink的lib中没有看到,还是打包进去了
在 2020-07-27 11:42:50,"Caizhi Weng" 写道:
>Hi,
>
>Flink 的 TableFactory 利用了 Java 的服务发现功能,所以需要这两个文件。需要确认 jar-with-dependencies
>是否能把这些资源文件打进去。
>
>另外为什么需要把 Flink 的依赖也打在大包里呢?因为 Flink 本身的
hi,
kafka->Flink->kafka->mysql
Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
使用kafka-connect是方便数据同时导出到其他存储
Flink定义输出表结构:
CREATE TABLE print_table \
(total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
WITH (\
'connector' = 'kafka', \
'topic' = 'test_out',
Hi,
可以配置 table.exec.resource.default-parallelism 为需要的并发。详见文档[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism
jun su 于2020年7月27日周一 下午3:50写道:
> hi all,
>
> Flink 目前的blink table planner batch mode
>
??jdbc
CREATE TABLE mvp_dim_anticheat_args_all (
id BIGINT,
dt STRING,
cnt_7d INT,
cnt_30d INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'driver'='com.mysql.jdbc.Driver',
'url' = 'jdbc:mysql://localhost:3306/huyou_oi',
'table-name' =
建议先配置heartbeat.timeout的值大一些,然后把gc log打出来
看看是不是经常发生fullGC,每次持续时间是多长,从你目前提供的log看,进程内JM->RM都会心跳超时
怀疑还是和GC有关的
env.java.opts.jobmanager: -Xloggc:/jobmanager-gc.log
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=512M
Best,
Yang
SmileSmile 于2020年7月27日周一 下午1:50写道:
> Hi,Yang
??
yarn??,sql-client??1.11.0
----
??:
"user-zh"
哇 这个方式很取巧了 好机智 我之前就是一直索引取值 学习一下
-- 原始邮件 --
发件人: Jark Wu
Hi,
版本:Flink-1.11.1
任务启动模式:standalone
Flink任务编译的jar的maven中包含了flink-avro,jar-with-dependencies编译的
org.apache.flink
flink-avro
1.11.1
编译出来的jar也包含了这个class
我看官网上说明 Flink has extensive built-in support for Apache Avro。感觉默认是支持avro的
1. 直接启动的话,会报错
建议确认一下 Yarn 的配置 “yarn.scheduler.minimum-allocation-mb” 在 Yarn RM 和 Flink JM
这两台机器上是否一致。
Yarn 会对 container request 做归一化。例如你请求的 TM container 是 1728m
(taskmanager.memory.process.size) ,如果 minimum-allocation-mb 是 1024m,那么实际得到的
container 大小必须是 minimum-allocation-mb 的整数倍,也就是 2048m。Flink 会去获取 Yarn
Hi,all:
本人基于Flink1.11.1的table API使用Hive方言,调用executSql方法后报错,堆栈信息如下:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute sql
at org.apache.flink.client.program.PackagedProgram.callMainMethod(
PackagedProgram.java:302)
你好 测试代码如下
private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
" `data` VARCHAR , " +
" `table` VARCHAR " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'source_databases'," +
" 'properties.bootstrap.servers' = '***'," +
" 'properties.group.id' = 'real1'," +
" 'format'
hi 能给出详细的schema信息吗?
wind.fly@outlook.com 于2020年7月27日周一 下午7:02写道:
> 补充一下,执行的sql如下:
>
> select order_no, order_time from
> x.ods.ods_binlog_test_trip_create_t_order_1
>
>
> 发件人: wind.fly@outlook.com
> 发送时间: 2020年7月27日 18:49
> 收件人: user-zh@flink.apache.org
schema信息如下:
CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 (
_type STRING,
order_no STRING,
order_time STRING,
dt as TO_TIMESTAMP(order_time),
proctime as PROCTIME(),
WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
你们是否在多线程环境下使用 TableEnvironment ?
TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。
godfrey he 于2020年7月28日周二 上午9:55写道:
> hi 能给出详细的schema信息吗?
>
> wind.fly@outlook.com 于2020年7月27日周一
> 下午7:02写道:
>
>> 补充一下,执行的sql如下:
>>
>> select order_no, order_time from
>>
不是多线程同时操作一个tableEnvironment,每执行一次都会创建一个TableEnvironment
发件人: godfrey he
发送时间: 2020年7月28日 9:58
收件人: user-zh
主题: Re: flink1.11.0 执行sqlQuery时报NullPointException
你们是否在多线程环境下使用 TableEnvironment ?
TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。
godfrey he
Hi, all.
请问Flink-1.10 on yarn Taskmanager启动的jvm GC 回收器参数默认信息是G1吗?
基本集群环境:hadoop-2.7.5、flink-1.10、jdk-1.8_61,其中jvm相关参数均未进行显示设置。
谢谢。
Best,
Roc Marshal.
我的怀疑点还是多线程引起的。
你能具体描述一下你们gateway的行为吗? 是一个web server?
另外,你可以在table env执行query前加上
RelMetadataQueryBase.THREAD_PROVIDERS
.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()));
这句话临时fix。
wind.fly@outlook.com 于2020年7月28日周二 上午11:02写道:
>
你的包是完整的flink-1.11.1的包吗?
例如 check一下 ClusterClientJobClientAdapter 这个类是否继承 CoordinationRequestGateway
?
shimin huang 于2020年7月28日周二 上午11:21写道:
> Hi,all:
> 本人基于Flink1.11.1的table API使用Hive方言,调用executSql方法后报错,堆栈信息如下:
> org.apache.flink.client.program.ProgramInvocationException: The main method
>
41 matches
Mail list logo