Re: flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询

2020-03-04 文章 JingsongLee
Hi,

你的需求是什么?下列哪种?
- 1.想用unbounded source,continuous的file source,监控文件夹,发送新文件,且需要支持多文件夹
- 2.只是想用bounded的input format,需要支持多文件

如果是1,现在仍然不支持。
如果是2,那你可以用env.addSource(new InputFormatSourceFunction(..)...)来支持多文件。

Best,
Jingsong Lee


--
From:王智 
Send Time:2020年3月4日(星期三) 17:34
To:user-zh 
Subject:flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 
不兼容问题咨询

我在使用flink 1.8 自定义 FileInputFormat 
的时候遇到了不兼容问题,初步看了源代码,不知道自己得出的结论是否正确,并想了解一下后续趋势和进展,麻烦各位大佬抽空指点一下,先行感谢~~




问题1: StreamExecutionEnvironment 为什么要做这样的限制?ContinuousFileMonitoringFunction 
的作用是什么? 

相关的代码描述如下




StreamExecutionEnvironment 对 FileInputFormat 对象有特殊的处理逻辑
if (inputFormat instanceof FileInputFormat) {
   @SuppressWarnings("unchecked")
   FileInputFormat

Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-03 文章 JingsongLee
Hi jun,

Jira: https://issues.apache.org/jira/browse/FLINK-16413
FYI

Best,
Jingsong Lee


--
From:JingsongLee 
Send Time:2020年3月3日(星期二) 19:06
To:Jun Zhang <825875...@qq.com>; user-zh@flink.apache.org 

Cc:user-zh@flink.apache.org ; like 
Subject:Re: 使用Flink1.10.0读取hive时source并行度问题

Hi jun,

很好的建议~ 这是一个优化点~ 可以建一个JIRA

Best,
Jingsong Lee


--
From:Jun Zhang <825875...@qq.com>
Send Time:2020年3月3日(星期二) 18:45
To:user-zh@flink.apache.org ; JingsongLee 

Cc:user-zh@flink.apache.org ; like 
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题



hi,jinsong:
 我想说一个问题, 我开始了自动推断,比如我设置推断的最大并行度是10,
我有一个类似的sql   select * from  mytable limit 1;
hive表mytable有超过10个文件,如果启动了10个并行度是不是有点浪费呢。
在2020年03月2日 16:38,JingsongLee 写道:
建议使用Batch模式来读取Hive table。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 16:35
To:lzljs3620...@aliyun.com 
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


我使用的是 StreamTableEnvironment,确实有碰到这个问题呢。
在2020年3月2日 16:16,JingsongLee 写道:   
 自动推断可能面临资源不足无法启动的问题

理论上不应该呀?Batch作业是可以部分运行的。

Best,
Jingsong Lee

--
From:like 
Send Time:2020年3月2日(星期一) 15:35
To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com 

Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 


在2020年3月2日 15:18,JingsongLee 写道:   Hi, 

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?  

Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-03 文章 JingsongLee
Hi jun,

很好的建议~ 这是一个优化点~ 可以建一个JIRA

Best,
Jingsong Lee


--
From:Jun Zhang <825875...@qq.com>
Send Time:2020年3月3日(星期二) 18:45
To:user-zh@flink.apache.org ; JingsongLee 

Cc:user-zh@flink.apache.org ; like 
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


 
hi,jinsong:
 我想说一个问题, 我开始了自动推断,比如我设置推断的最大并行度是10,
我有一个类似的sql   select * from  mytable limit 1;
hive表mytable有超过10个文件,如果启动了10个并行度是不是有点浪费呢。
在2020年03月2日 16:38,JingsongLee 写道:
建议使用Batch模式来读取Hive table。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 16:35
To:lzljs3620...@aliyun.com 
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


我使用的是 StreamTableEnvironment,确实有碰到这个问题呢。
在2020年3月2日 16:16,JingsongLee 写道:   
 自动推断可能面临资源不足无法启动的问题

理论上不应该呀?Batch作业是可以部分运行的。

Best,
Jingsong Lee

--
From:like 
Send Time:2020年3月2日(星期一) 15:35
To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com 

Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 


在2020年3月2日 15:18,JingsongLee 写道:   Hi, 

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?  

Re: 开发相关问题咨询Development related problems consultation

2020-03-02 文章 JingsongLee
Hi, welcome,

For user side,

u...@flink.apache.org is for English.
user-zh@flink.apache.org is for Chinese.

d...@flink.apache.org is for development related discussions, so please not 
send to it.

Best,
Jingsong Lee


--
From:王博迪 
Send Time:2020年3月2日(星期一) 17:08
To:user-zh ; dev 
Subject:开发相关问题咨询Development related problems consultation

您好,
   我是你们flink的新用户,有一些开发相关的问题想咨询,问一下可以和哪个邮箱交流。
谢谢
Hello, I am a new user of flink. I would like to ask you some questions related 
to development. I would like to know which email can I communicate with

Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-02 文章 JingsongLee
建议使用Batch模式来读取Hive table。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 16:35
To:lzljs3620...@aliyun.com 
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题

  
我使用的是 StreamTableEnvironment,确实有碰到这个问题呢。
在2020年3月2日 16:16,JingsongLee 写道:   
> 自动推断可能面临资源不足无法启动的问题

理论上不应该呀?Batch作业是可以部分运行的。

Best,
Jingsong Lee

--
From:like 
Send Time:2020年3月2日(星期一) 15:35
To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com 

Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 
 

在2020年3月2日 15:18,JingsongLee 写道:   Hi, 

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?

Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-02 文章 JingsongLee
> 自动推断可能面临资源不足无法启动的问题

理论上不应该呀?Batch作业是可以部分运行的。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 15:35
To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com 

Subject:回复: 使用Flink1.10.0读取hive时source并行度问题

  
非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 
 

在2020年3月2日 15:18,JingsongLee 写道:Hi, 

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?  

Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-01 文章 JingsongLee
Hi, 

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

  我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?

Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错

2020-01-15 文章 JingsongLee
+user-zh


--
From:JingsongLee 
Send Time:2020年1月15日(星期三) 16:05
To:Others <41486...@qq.com>
Subject:Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错

是的。
另一个方法是使用[1]的classpath,添加多个jars。

BTW, 回复邮件时请带上user-zh。

Best,
Jingsong Lee

[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#usage


--
From:Others <41486...@qq.com>
Send Time:2020年1月15日(星期三) 15:54
To:user-zh@flink.apache.org JingsongLee 
Subject:回复: Re: Re: 求助帖:flink 连接kafka source 部署集群报错

Hi,
我的集群 是Standalone 方式部署的 是加在 Flink Master机器下么 还是每一台都要加? 加完之后是否需要重启集群?


-- 原始邮件 --
发件人: "JingsongLee";
发送时间: 2020年1月15日(星期三) 下午3:46
收件人: "Others"<41486...@qq.com>;"user-zh";
主题:  Re: Re: 求助帖:flink 连接kafka source 部署集群报错

Hi,

我怀疑的你这样打包会导致meta-inf.services的文件相互覆盖。
你试试把flink-json和flink-kafka的jar直接放入flink/lib下

Best,
Jingsong Lee


--
From:Others <41486...@qq.com>
Send Time:2020年1月15日(星期三) 15:27
To:user-zh@flink.apache.org JingsongLee 
Subject:回复: Re: 求助帖:flink 连接kafka source 部署集群报错


集群环境下应该放在哪个lib下?

一下是打包过程的log
[INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flinkjob ---
[INFO] Including org.apache.flink:flink-core:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-annotations:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-metrics-core:jar:1.9.1 in the shaded 
jar.
[INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded jar.
[INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded jar.
[INFO] Including org.objenesis:objenesis:jar:2.1 in the shaded jar.
[INFO] Including commons-collections:commons-collections:jar:3.2.2 in the 
shaded jar.
[INFO] Including org.apache.commons:commons-compress:jar:1.18 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-asm-6:jar:6.2.1-7.0 in the 
shaded jar.
[INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
[INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded jar.
[INFO] Excluding org.apache.flink:force-shading:jar:1.9.1 from the shaded jar.
[INFO] Including org.javassist:javassist:jar:3.19.0-GA in the shaded jar.
[INFO] Including org.scala-lang:scala-library:jar:2.11.12 in the shaded jar.
[INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-7.0 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-table-api-java-bridge_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-java:jar:1.9.1 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-table-planner-blink_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala_2.11:jar:1.9.1 in the 
shaded jar.
[INFO] Including org.scala-lang:scala-reflect:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang:scala-compiler:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang.modules:scala-xml_2.11:jar:1.0.5 in the shaded 
jar.
[INFO] Including org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4 
in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala-bridge_2.11:jar:1.9.1 
in the shaded jar.
[INFO] Including org.apache.flink:flink-scala_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1 in the 
shaded jar.
[INFO] Including org.apache.flink:flink-table-runtime-blink_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.codehaus.janino:janino:jar:3.0.9 in the shaded jar.
[INFO] Including org.codehaus.janino:commons-compiler:jar:3.0.9 in the shaded 
jar.
[INFO] Including org.apache.calcite.avatica:avatica-core:jar:1.15.0 in the 
shaded jar.
[INFO] Including org.reflections:reflections:jar:0.9.10 in the shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.9.1 in the 
shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka-base_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.apache.kafka:kafka-clients:jar:2.2.0 in the shaded jar.
[INFO] Including com.github.luben:zstd-jni:jar:1.3.8-1 in the shaded jar.
[INFO] Including org.lz4:lz4-java:jar:1.5.0 in the shaded jar.
[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-common:jar:1.9.1 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-jdbc_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including mysql:mysql-connector-java:jar:5.1.48 in the shaded jar.
[INFO] Including com.google.code.gson:gson:jar:2.8.5 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar.
[INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar.
[WARNING] janino-3.0.9.ja

Re: Re: 求助帖:flink 连接kafka source 部署集群报错

2020-01-14 文章 JingsongLee
Hi,

我怀疑的你这样打包会导致meta-inf.services的文件相互覆盖。
你试试把flink-json和flink-kafka的jar直接放入flink/lib下

Best,
Jingsong Lee


--
From:Others <41486...@qq.com>
Send Time:2020年1月15日(星期三) 15:27
To:user-zh@flink.apache.org JingsongLee 
Subject:回复: Re: 求助帖:flink 连接kafka source 部署集群报错


集群环境下应该放在哪个lib下?

一下是打包过程的log
[INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flinkjob ---
[INFO] Including org.apache.flink:flink-core:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-annotations:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-metrics-core:jar:1.9.1 in the shaded 
jar.
[INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded jar.
[INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded jar.
[INFO] Including org.objenesis:objenesis:jar:2.1 in the shaded jar.
[INFO] Including commons-collections:commons-collections:jar:3.2.2 in the 
shaded jar.
[INFO] Including org.apache.commons:commons-compress:jar:1.18 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-asm-6:jar:6.2.1-7.0 in the 
shaded jar.
[INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
[INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded jar.
[INFO] Excluding org.apache.flink:force-shading:jar:1.9.1 from the shaded jar.
[INFO] Including org.javassist:javassist:jar:3.19.0-GA in the shaded jar.
[INFO] Including org.scala-lang:scala-library:jar:2.11.12 in the shaded jar.
[INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-7.0 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-table-api-java-bridge_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-java:jar:1.9.1 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-table-planner-blink_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala_2.11:jar:1.9.1 in the 
shaded jar.
[INFO] Including org.scala-lang:scala-reflect:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang:scala-compiler:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang.modules:scala-xml_2.11:jar:1.0.5 in the shaded 
jar.
[INFO] Including org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4 
in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala-bridge_2.11:jar:1.9.1 
in the shaded jar.
[INFO] Including org.apache.flink:flink-scala_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1 in the 
shaded jar.
[INFO] Including org.apache.flink:flink-table-runtime-blink_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.codehaus.janino:janino:jar:3.0.9 in the shaded jar.
[INFO] Including org.codehaus.janino:commons-compiler:jar:3.0.9 in the shaded 
jar.
[INFO] Including org.apache.calcite.avatica:avatica-core:jar:1.15.0 in the 
shaded jar.
[INFO] Including org.reflections:reflections:jar:0.9.10 in the shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.9.1 in the 
shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka-base_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.apache.kafka:kafka-clients:jar:2.2.0 in the shaded jar.
[INFO] Including com.github.luben:zstd-jni:jar:1.3.8-1 in the shaded jar.
[INFO] Including org.lz4:lz4-java:jar:1.5.0 in the shaded jar.
[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-common:jar:1.9.1 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-jdbc_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including mysql:mysql-connector-java:jar:5.1.48 in the shaded jar.
[INFO] Including com.google.code.gson:gson:jar:2.8.5 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar.
[INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar.
[WARNING] janino-3.0.9.jar, flink-table-planner-blink_2.11-1.9.1.jar define 440 
overlapping classes: 
[WARNING]   - org.codehaus.janino.util.resource.ResourceCreator
[WARNING]   - org.codehaus.janino.ReflectionIClass$ReflectionIField
[WARNING]   - org.codehaus.janino.IClass$1
[WARNING]   - org.codehaus.janino.UnitCompiler$35
[WARNING]   - 
org.codehaus.janino.Java$CompilationUnit$SingleStaticImportDeclaration
[WARNING]   - org.codehaus.janino.Java$PackageMemberEnumDeclaration
[WARNING]   - org.codehaus.janino.UnitCompiler$13$1
[WARNING]   - org.codehaus.janino.Unparser
[WARNING]   - org.codehaus.janino.CodeContext$Branch
[WARNING]   - org.codehaus.janino.UnitCompiler$33$2
[WARNING]   - 430 more...
[WARNING] avatica-core-1.15.0.jar, flink-table-planner-blink_2.11-1.9.1.jar 
define 605 overlapping classes: 
[WARNING]   - org.apache.calcite.avatica.AvaticaParameter
[WARNING]   - org.apache.calcite.avatic

Re: 求助帖:flink 连接kafka source 部署集群报错

2020-01-14 文章 JingsongLee
Hi,

你是不是没有把Json的jar包放入lib下?看起来你的User jar也没用jar-with-dependencies,所以也不会包含json的jar。

Best,
Jingsong Lee


--
From:Others <41486...@qq.com>
Send Time:2020年1月15日(星期三) 15:03
To:user-zh 
Subject:求助帖:flink 连接kafka source 部署集群报错

我使用的flink 版本 是1.9.1
本地调试正常。部署集群启动时报一下错误
2020-01-15 11:57:44,255 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: findAndCreateTableSource failed.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource 
failed.
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
at 
com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 
'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186)
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
at 

Re: 求助帖: 流join场景可能出现的重复计算

2020-01-14 文章 JingsongLee
Hi ren,

Blink的deduplication功能应该是能match你的需求。[1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication

Best,
Jingsong Lee


--
From:Caizhi Weng 
Send Time:2020年1月15日(星期三) 11:53
To:user-zh 
Subject:Re: 求助帖: 流join场景可能出现的重复计算

Hi,

Flink 目前认为所有的 source 都是 append only 的,retract、upsert 等都是内部处理时的概念,对用户是不可见的。

所以目前你只能先通过 group by 和 last_value 等方式实现功能。不过 1.11 有计划支持这样的需求。

Ren Xie  于2020年1月14日周二 下午9:30写道:

> 谢谢
>
> 考虑过group by , 实际中 一个好多字段的表, 保不准就是那个字段发生了变化.
>
> 请问 类似的双流操作在开发中常见吗, 怎么都搜不到相似的使用, 按理谁流依赖另一个流做处理 应该算常见吧
>
> 还是说我这样的需求呀 实现呀 是野路子?
>
> Yuan,Youjun  于2020年1月14日周二 下午8:22写道:
>
> > 取决于具体的场景。想到的有如下几种方案:
> > 1,group by student_id和student_name,而不是只group by
> > student_id。当然前提是修改同名名字不会推送一条消息到流1.
> > 2,过滤掉update的消息
> > 3,基于时间窗口的聚合,对于student表的数据,每n秒输出一个唯一的student_id,然后再与score流join。
> >
> > -邮件原件-
> > 发件人: xin Destiny 
> > 发送时间: Tuesday, January 14, 2020 6:39 PM
> > 收件人: user-zh@flink.apache.org
> > 主题: Re: 求助帖: 流join场景可能出现的重复计算
> >
> > Hi,
> > 如果说插入两条update操作呢,一次分数是-97,一次是97
> >
> >
> >
> >
> > Ren Xie  于2020年1月14日周二 下午6:20写道:
> >
> > > 实际场景还是有点复杂的, 便于理解 我简化成这样的,  简化后的这个, 没有实际的代码, 抱歉
> > >
> > > 大致 写一下 也就是这样了
> > > ```sql
> > > select sum(score)
> > > from
> > > student t1 inner join score t2 on t1.student_id = t2.std_id where
> > > t1.student_id = 11
> > > ```
> > > 然后
> > >
> > > ```Java
> > > String sql = ↑;
> > > Table t = tEnv.sqlQuery(sql);
> > > DataStream stream1 = tEnv.toAppendStream(t, Integer.class);
> > > stream1.keyBy("").sum("");
> > > ```
> > >
> > > 这样的一个sql, 在student表插入一个数据, score表插入2个数据后, 会执行一次计算出一个结果97 + 98
> > >
> > > update 学生表的name后, 一个新事件进入student的流, 还会触发一次计算, 得到97 + 98
> > >
> > > 因为可能有新的成绩插入, 所以对 stream1进行sum操作, 导致 97和98 都被重复计算了一次
> > >
> > >
> > > Caizhi Weng  于2020年1月14日周二 下午5:49写道:
> > >
> > > > Hi,
> > > >
> > > > 有可能的话,是否方便提供一下代码呢?
> > > >
> > > > Ren Xie  于2020年1月14日周二 下午5:38写道:
> > > >
> > > > > 学生
> > > > > student_id name
> > > > > 11 foo
> > > > >
> > > > > 学科分数
> > > > > id name score std_id
> > > > > 100 math 97 11
> > > > > 101 english 98 11
> > > > >
> > > > > 有如下一个场景(假设只有一个学生)
> > > > >
> > > > > 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算
> > > > >
> > > > > 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195
> > > > >
> > > > > 但此时发现学生姓名登记错误, 于是进行了修改,
> > > > > 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2 *
> > > > > (97
> > > +
> > > > > 98) = 390
> > > > >
> > > > > Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193
> > > > >
> > > > > 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!
> > > > >
> > > >
> > >
> >
>


Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 文章 JingsongLee
谢谢,
你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。

Best,
Jingsong Lee


--
From:Kevin Liao 
Send Time:2020年1月14日(星期二) 11:38
To:user-zh ; JingsongLee 
Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错

flink 版本是 1.9.1 release

Doc 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约 
30 多个字段,我理解这跟字段数关系不大

```
import org.apache.commons.lang3.builder.ToStringBuilder;
import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;

/**
 * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
 */
@JsonIgnoreProperties(ignoreUnknown = true)
public class Doc {

  private String suv;
  private Float factor = 1F;
  private String st;
  private String agentId;
  private Long timestamp;
  ... // omit some, omit getters and setters
```

希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)
JingsongLee  于2020年1月14日周二 上午11:25写道:
Hi Kevin,

 这是什么版本?
 Doc类能完整提供下吗?方便我们复现。

 Best,
 Jingsong Lee


 --
 From:Kevin Liao 
 Send Time:2020年1月13日(星期一) 17:37
 To:user-zh 
 Subject:blink planner的org.apache.flink.table.api.ValidationException报错

 tEnv.connect(new Kafka()
 .version("universal")
 .topic("xxx")
 .startFromLatest()
 .property("bootstrap.servers",
 "")
 .property("group.id", ""))
 .withFormat(new Json().failOnMissingField(false).deriveSchema())
 .withSchema(new Schema()
 //.field("logger_name", Types.STRING)
 //.field("host", Types.STRING)
 //.field("@timestamp", Types.SQL_TIMESTAMP)
 //.field("_rowtime", Types.SQL_TIMESTAMP)
 //.rowtime(
 //new
 Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
 .field("doc", Types.POJO(Doc.class))
 )
 .inAppendMode()
 .registerTableSource("xxx");

 Table result = tEnv.sqlQuery(
 "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");

 //result.printSchema();
 tEnv.toAppendStream(result,
 new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
 STRING, STRING, STRING,
 STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
 STRING, STRING, STRING,
 STRING, LONG, STRING, INT, STRING, INT)).print();



 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:


 、、、

 Exception in thread "main"
 org.apache.flink.table.api.ValidationException: Type
 LEGACY(PojoType) of
 table field 'doc' does not match with type
 PojoType of the field
 'doc' of the TableSource return type.
  at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
  at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at 
org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
  at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
  at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
  at 
org.apache.flink.table.planner.plan.nodes.physica

Re: 注册table时catalog无法变更

2020-01-07 文章 JingsongLee
Hi xiyueha,

你可以用TableEnv.sqlUpdate("create table ...")的DDL的方式,这会注册到当前catalog中。

Best,
Jingsong Lee


--
From:Kurt Young 
Send Time:2020年1月8日(星期三) 09:17
To:user-zh 
Cc:xiyueha 
Subject:Re: 注册table时catalog无法变更

临时表的话只能放在指定的catalog中,不建议将临时表注册到另一个catalog,比如hive catalog。
临时表大部分情况下是不能序列化的,那样的话代码会报错。

Best,
Kurt


On Tue, Jan 7, 2020 at 9:20 PM 贺小令  wrote:

> hi,
>
> streamTableEnvironment.registerDataStream(tableName, dataStream, fields);
> 注册的表都是Temporary Table。
>
> 你可以通过:
> catalog = new InMemoryExternalCatalog(catalogName);
> streamTableEnvironment.registerCatalog(catalogName, catalog);
> catalog.createTable()
>
> 或者
> streamTableEnvironment.getCatalog().get().createTable()
>
> 的方式来注册表到指定的catalog
>
>
> xiyu...@163.com  于2020年1月7日周二 下午3:20写道:
>
> > hi,各位:
> >
> >
> 我在开发过程中,通过下面方式注册table时,默认使用的catalog是EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
> > streamTableEnvironment.registerDataStream(tableName, dataStream,
> >
> fields);尝试通过下面方式解决,但是注册的table仍然在EnvironmentSettings.DEFAULT_BUILTIN_CATALOG中
> >   streamTableEnvironment.registerCatalog(catalogName, new
> > InMemoryExternalCatalog(catalogName));
> > streamTableEnvironment.useCatalog(catalogName);请问,我如何将table注册到指定的catalog?
> >
> >
> > xiyu...@163.com
> >
>


Re: FLINK 1.9.1 StreamingFileSink 压缩问题

2020-01-01 文章 JingsongLee
Hi,

看起来你只能改下connector代码才能支持压缩了:
ParquetAvroWriters.createAvroParquetWriter里:设置AvroParquetWriter.Builder的压缩格式。

Best,
Jingsong Lee


--
From:USERNAME 
Send Time:2020年1月2日(星期四) 13:36
To:user-zh 
Subject:FLINK 1.9.1 StreamingFileSink 压缩问题

各位好,FLINK 1.9.1 使用 StreamingFileSink 写Parquet到HDFS,能启用压缩吗?

--代码
StreamingFileSink sink = StreamingFileSink
.forBulkFormat(new Path(FILE_HDFS_PATH), 
ParquetAvroWriters.forReflectRecord(HDFSBean.class))
.withBucketAssigner(new DateTimeBucketAssigner<>(FILE_HDFS_FORMAT))

.build();



Re: How should i set the field type in mysql when i use temporal table join between kafka and jdbc ?

2020-01-01 文章 JingsongLee
Hi,

user-zh我就说中文啦.
你需要设置成bigint.
具体报什么错?

Best,
Jingsong Lee


--
From:刘世民 
Send Time:2020年1月2日(星期四) 13:47
To:user-zh 
Subject:How should i set the field type in mysql when i use temporal table join 
between kafka and jdbc ?


for example, num_count field type is Long, but no matter if i set it to bigint 
or something else in mysql table, it has always report errorr...
so,what can i should set num_count field type in mysql? thanks

Best !
amenhub



Re: StreamTableEnvironment.registerDatastream() 开放用户自定义的schemaDescriptionh和DeserializationSchema

2019-12-31 文章 JingsongLee
Hi aven,

这是个合理的需求。
现在的问题是:
- Flink table只支持Row, Pojo, Tuple, CaseClass作为结构化的数据类型。
- 
而你的类型是JSONObject,它其实也是一个结构化的数据类型,但是目前Flink不支持它,所以可以考虑有这样的DeserializationSchema机制来支持它。

但是我理解其实没有差别多少,比如你提供RowDeserializationSchema,其实就是JSONObject到Row的转换,那你完全可以把这个套在DataStream.map中,把它转换成Flink
 table支持的结构化类型。

Best,
Jingsong Lee


--
From:aven.wu 
Send Time:2019年12月31日(星期二) 14:09
To:user-zh@flink.apache.org 
Subject:回复: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

你好!
“把 JSONObject类型定义成object类型” 可以解决在确定字段和类型的情况下并且需要编码到程序中。
如果能开放这部分的能力,可以不通过编码(新增POJO)的方式来完成一个Datastream 到 stream 的table注册。

best wish
发送自 Windows 10 版邮件应用

发件人: Terry Wang
发送时间: 2019年12月30日 12:37
收件人: user-zh@flink.apache.org
主题: Re: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

你这种需求的一种解决思路,可以把 
JSONObject类型定义成object类型,然后注册成table之后通过一个UDTF把JSONObject转换成特定的schema。

Best,
Terry Wang



> 2019年12月27日 19:56,aven.wu  写道:
> 
> StreamTableEnvironment.registerDatastream(),目前只支持pojo 或者 
> 是public属性的对象,根据默认字段或者用户指定的字段注册table,但某些场景下没有固定的格式,比如使用JSONObject类型DataStream就无法通过这种方法注册成table,是否可以提供更底层的API来使table注册的灵活性更高。用户传入schema的描述和自定义的解析器DeserializationSchema.
> 
> 
> 发送自 Windows 10 版邮件应用
> 




Re: Flink1.9批任务yn和ys对任务的影响

2019-12-26 文章 JingsongLee
SQL的Batch作业是会关闭slotsharing的。

Best,
Jingsong Lee


--
From:faaron zheng 
Send Time:2019年12月26日(星期四) 17:23
To:user-zh@flink.apache.org 
Subject:回复:Flink1.9批任务yn和ys对任务的影响

了解了,感谢三位。我的slot上包括一个hash-join一个hash-agg,加起来刚好256mb。不过因为存在slotsharing的原因,感觉并不容易提前判断。
 faaron zheng 邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 
15:09,JingsongLee 写道: Hi faaron zheng, 如kurt所说,强烈建议使用1.10,现在已拉分支。 
TM运行的一个经验值是:TM有10个Slot,TM内存10G:JVM堆内4G、1G网络buffer、manage内存5G(也就是说单个slot的manage内存500M)。
 Best, Jingsong Lee 
-- From:Kurt 
Young  Send Time:2019年12月26日(星期四) 14:07 To:user-zh 
 Subject:Re: Flink1.9批任务yn和ys对任务的影响 
也可以试下最新的1.10版本,这个版本里面 sql 的算子已经不再申请固定写死的内存数量, 而是根据当时 slot 能提供多少 managed 
内存来自适应了。 Best, Kurt On Thu, Dec 26, 2019 at 1:36 PM Xintong Song 
 wrote: > slot需要多少内存是和具体作业相关的,不同作业差别会比较大。 > > 
slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。 > 
算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。 > > 
如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with > 
profile"就能够看到slot的资源需求。 > > Thank you~ > > Xintong Song > > > [1] > > 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options
 > > On Thu, Dec 26, 2019 at 11:36 AM faaron zheng  > 
wrote: > > > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed > > 
memory为2g,也就是一个slot平均200m,所以任务没调度起来。 > > 
但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱: > > 
faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道: > > 
感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed > > 
memory为2g,也就是一个slot平均200m,所以任务没调度起来。 > > 
但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱: > > 
faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道: > > Hi 
faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM > > 
的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink > > 
1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot > 的managed > 
> memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09 > > 
AM faaron zheng  wrote: > 跑tpcds的query1: flink > run > > 
-m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink > > 
run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g > > 
任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to > > > 
ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: > > > 
faaronzh...@gmail.com 签名由 网易邮箱大师 定制 >

Re: Flink1.9批任务yn和ys对任务的影响

2019-12-25 文章 JingsongLee
Hi faaron zheng,

如kurt所说,强烈建议使用1.10,现在已拉分支。

TM运行的一个经验值是:TM有10个Slot,TM内存10G:JVM堆内4G、1G网络buffer、manage内存5G(也就是说单个slot的manage内存500M)。

Best,
Jingsong Lee


--
From:Kurt Young 
Send Time:2019年12月26日(星期四) 14:07
To:user-zh 
Subject:Re: Flink1.9批任务yn和ys对任务的影响

也可以试下最新的1.10版本,这个版本里面 sql 的算子已经不再申请固定写死的内存数量,
而是根据当时 slot 能提供多少 managed 内存来自适应了。

Best,
Kurt


On Thu, Dec 26, 2019 at 1:36 PM Xintong Song  wrote:

> slot需要多少内存是和具体作业相关的,不同作业差别会比较大。
>
> slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。
> 算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。
>
> 如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with
> profile"就能够看到slot的资源需求。
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options
>
> On Thu, Dec 26, 2019 at 11:36 AM faaron zheng 
> wrote:
>
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道:
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道:
> > Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM
> > 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink
> > 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot
> 的managed
> > memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09
> > AM faaron zheng  wrote: > 跑tpcds的query1: flink
> run
> > -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink
> > run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g
> > 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to >
> > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: >
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制
>


Re: Flink实时数仓落Hive一般用哪种方式好?

2019-12-10 文章 JingsongLee
实时性取决于checkpoint时间间隔。
Flink这边的sink没有合并小文件的功能。

Best,
Jingsong Lee


--
From:陈帅 
Send Time:2019年12月10日(星期二) 21:45
To:JingsongLee 
Subject:Re: Flink实时数仓落Hive一般用哪种方式好?

我想要的streaming写就是数据实时写入HDFS文件,场景有实时数据仓库等。需要平衡实时性以及小文件过多的问题。目前处理小文件问题的方法都是在事后合并文件吗?
JingsongLee  于2019年12月10日周二 上午10:48写道:

Hi 陈帅,

1.BulkWriter.Factory接口不适合ORC, 正如yue ma所说,你需要一些改动
2.StreamingFileSink整个机制都是基于做checkpoint才会真正move文件的,不知道你所想的streaming写是什么,以及对你的业务场景有什么要求吗?

Best,
Jingsong Lee

--
From:陈帅 
Send Time:2019年12月10日(星期二) 08:21
To:user-zh@flink.apache.org ; JingsongLee 

Subject:Re: Flink实时数仓落Hive一般用哪种方式好?

1. 相比Parquet,目前StreamingFileSink支持ORC的难点在哪里呢?
2. BulkWriter是不是攒微批写文件的?
JingsongLee  于2019年12月9日周一 下午3:24写道:
Hi 帅,
 - 目前可以通过改写StreamingFileSink的方式来支持Parquet。
 (但是目前StreamingFileSink支持ORC比较难)
 - BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。
 - 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。

 在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1]

 [1] https://issues.apache.org/jira/browse/FLINK-14249

 Best,
 Jingsong Lee


 --
 From:陈帅 
 Send Time:2019年12月8日(星期日) 10:04
To:user-zh@flink.apache.org 
 Subject:Flink实时数仓落Hive一般用哪种方式好?

 有人说直接写到HBase,再在Hive关联Hbase表
 但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:

 1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
 写的话,目前来看没有现成的Streaming
 Writer,官方提供的都是
 BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
 业务上的Update和Delete操作 数据一般是如何sync进Hive的?

 2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?


Re: Flink RetractStream如何转成AppendStream?

2019-12-10 文章 JingsongLee
目前不能由SQL直接转。

Best,
Jingsong Lee


--
From:陈帅 
Send Time:2019年12月10日(星期二) 21:48
To:JingsongLee 
Subject:Re: Flink RetractStream如何转成AppendStream?

代码api的方式我知道怎么转,想知道用sql的方式要如何转?需要先写到一张临时表再sink到目标表?有例子吗?
JingsongLee  于2019年12月10日周二 上午10:49写道:

 参考下lucas.wu的例子?

Best,
Jingsong Lee

--
From:陈帅 
Send Time:2019年12月10日(星期二) 08:25
To:user-zh@flink.apache.org ; JingsongLee 

Subject:Re: Flink RetractStream如何转成AppendStream?

"你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。"
==>我想知道通过Flink SQL方式要如何实现这种转换?
JingsongLee  于2019年12月9日周一 下午3:17写道:
Hi 帅,

 你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。

 Best,
 Jingsong Lee


 --
 From:Jark Wu 
 Send Time:2019年12月8日(星期日) 11:54
 To:user-zh 
 Subject:Re: Flink RetractStream如何转成AppendStream?

 Hi,

 目前 Kafka  只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持
 RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。

 Best,
 Jark

 On Sun, 8 Dec 2019 at 10:08, 陈帅  wrote:

 > 在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka
 >
 > sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?
 >


Re: Re: Flink实时数仓落Hive一般用哪种方式好?

2019-12-09 文章 JingsongLee
Hi hjxhainan,

如果你要取消订阅。
请发送邮件到user-zh-unsubscr...@flink.apache.org

Best,
Jingsong Lee


--
From:hjxhai...@163.com 
Send Time:2019年12月10日(星期二) 10:52
To:user-zh ; JingsongLee ; 
陈帅 
Subject:Re: Re: Flink实时数仓落Hive一般用哪种方式好?


怎么退出邮件订阅




hjxhai...@163.com 
发件人: JingsongLee
发送时间: 2019-12-10 10:48
收件人: 陈帅; user-zh@flink.apache.org
主题: Re: Flink实时数仓落Hive一般用哪种方式好?
Hi 陈帅,
1.BulkWriter.Factory接口不适合ORC, 正如yue ma所说,你需要一些改动
2.StreamingFileSink整个机制都是基于做checkpoint才会真正move文件的,不知道你所想的streaming写是什么,以及对你的业务场景有什么要求吗?
Best,
Jingsong Lee
--
From:陈帅 
Send Time:2019年12月10日(星期二) 08:21
To:user-zh@flink.apache.org ; JingsongLee 

Subject:Re: Flink实时数仓落Hive一般用哪种方式好?
1. 相比Parquet,目前StreamingFileSink支持ORC的难点在哪里呢?
2. BulkWriter是不是攒微批写文件的?
JingsongLee  于2019年12月9日周一 下午3:24写道:
Hi 帅,
 - 目前可以通过改写StreamingFileSink的方式来支持Parquet。
 (但是目前StreamingFileSink支持ORC比较难)
 - BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。
 - 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。
 在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1]
 [1] https://issues.apache.org/jira/browse/FLINK-14249
 Best,
 Jingsong Lee
 --
 From:陈帅 
 Send Time:2019年12月8日(星期日) 10:04
To:user-zh@flink.apache.org 
 Subject:Flink实时数仓落Hive一般用哪种方式好?
 有人说直接写到HBase,再在Hive关联Hbase表
 但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:
 1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
 写的话,目前来看没有现成的Streaming
 Writer,官方提供的都是
 BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
 业务上的Update和Delete操作 数据一般是如何sync进Hive的?
 2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?  

Re: Flink RetractStream如何转成AppendStream?

2019-12-09 文章 JingsongLee
参考下lucas.wu的例子?

Best,
Jingsong Lee


--
From:陈帅 
Send Time:2019年12月10日(星期二) 08:25
To:user-zh@flink.apache.org ; JingsongLee 

Subject:Re: Flink RetractStream如何转成AppendStream?

"你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。"
==>我想知道通过Flink SQL方式要如何实现这种转换?
JingsongLee  于2019年12月9日周一 下午3:17写道:
Hi 帅,

 你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。

 Best,
 Jingsong Lee


 --
 From:Jark Wu 
 Send Time:2019年12月8日(星期日) 11:54
 To:user-zh 
 Subject:Re: Flink RetractStream如何转成AppendStream?

 Hi,

 目前 Kafka  只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持
 RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。

 Best,
 Jark

 On Sun, 8 Dec 2019 at 10:08, 陈帅  wrote:

 > 在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka
 >
 > sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?
 >


Re: Flink实时数仓落Hive一般用哪种方式好?

2019-12-09 文章 JingsongLee
Hi 陈帅,

1.BulkWriter.Factory接口不适合ORC, 正如yue ma所说,你需要一些改动
2.StreamingFileSink整个机制都是基于做checkpoint才会真正move文件的,不知道你所想的streaming写是什么,以及对你的业务场景有什么要求吗?

Best,
Jingsong Lee


--
From:陈帅 
Send Time:2019年12月10日(星期二) 08:21
To:user-zh@flink.apache.org ; JingsongLee 

Subject:Re: Flink实时数仓落Hive一般用哪种方式好?

1. 相比Parquet,目前StreamingFileSink支持ORC的难点在哪里呢?
2. BulkWriter是不是攒微批写文件的?
JingsongLee  于2019年12月9日周一 下午3:24写道:
Hi 帅,
 - 目前可以通过改写StreamingFileSink的方式来支持Parquet。
 (但是目前StreamingFileSink支持ORC比较难)
 - BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。
 - 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。

 在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1]

 [1] https://issues.apache.org/jira/browse/FLINK-14249

 Best,
 Jingsong Lee


 --
 From:陈帅 
 Send Time:2019年12月8日(星期日) 10:04
To:user-zh@flink.apache.org 
 Subject:Flink实时数仓落Hive一般用哪种方式好?

 有人说直接写到HBase,再在Hive关联Hbase表
 但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:

 1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
 写的话,目前来看没有现成的Streaming
 Writer,官方提供的都是
 BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
 业务上的Update和Delete操作 数据一般是如何sync进Hive的?

 2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?


Re: [flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowtime?

2019-12-08 文章 JingsongLee
Hi 猫猫:

在DDL上定义rowtime是刚刚支持的功能,文档正在编写中。[1]
你可以通过master的代码来试用,社区正在准备发布1.10,到时候会有release版本可用。

[2] 中有使用的完整例子,FYI。

[1] https://issues.apache.org/jira/browse/FLINK-14320
[2] 
https://github.com/apache/flink/blob/2ecf7cacbe742099d78c528de962fccf13c14629/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimeAttributeITCase.scala

Best,
Jingsong Lee


--
From:猫猫 <16770...@qq.com>
Send Time:2019年12月6日(星期五) 17:52
To:user-zh 
Subject:[flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowtime?

我使用tableEnv.sqlUpdate(ddl);方式创建表


但是我无法像通过流注册方式一样,添加rowtime。我尝试在定义字段上添加rowtime,但是创表语句会报错。
请问在flink中是否支持使用该种方式创建流表,并开窗?


我的例子是一个csv源,我还没有尝试kafka源,但是我看了文档,也没有找到相关描述。


sql创表语句如下:
CREATE TABLE T_UserBehavior(
   userId BIGINT,
   itemId BIGINT,
   categoryId BIGINT,
   behavior VARCHAR,
   optime BIGINT
) WITH (
  'connector.type' = 'filesystem',   -- required: specify to 
connector type
  'connector.path' = 
'file:///E:\MyGitProject\flink-study\Hot-Item\src\main\resources\UserBehavior-less.csv',
  -- required: path to a file or directory
  'format.type' = 'csv',
  'format.fields.0.name' = 'userId', -- required: define the schema 
either by using type information
  'format.fields.0.type' = 'BIGINT',
  'format.fields.1.name' = 'itemId',
  'format.fields.1.type' = 'BIGINT',
  'format.fields.2.name' = 'categoryId',
  'format.fields.2.type' = 'BIGINT',
  'format.fields.3.name' = 'behavior',
  'format.fields.3.type' = 'VARCHAR',
  'format.fields.4.name' = 'optime',
  'format.fields.4.type' = 'BIGINT'
);

Re: Flink实时数仓落Hive一般用哪种方式好?

2019-12-08 文章 JingsongLee
Hi 帅,
- 目前可以通过改写StreamingFileSink的方式来支持Parquet。
(但是目前StreamingFileSink支持ORC比较难)
- BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。
- 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。

在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1]

[1] https://issues.apache.org/jira/browse/FLINK-14249

Best,
Jingsong Lee


--
From:陈帅 
Send Time:2019年12月8日(星期日) 10:04
To:user-zh@flink.apache.org 
Subject:Flink实时数仓落Hive一般用哪种方式好?

有人说直接写到HBase,再在Hive关联Hbase表
但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:

1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
写的话,目前来看没有现成的Streaming
Writer,官方提供的都是
BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
业务上的Update和Delete操作 数据一般是如何sync进Hive的?

2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?


Re: Flink RetractStream如何转成AppendStream?

2019-12-08 文章 JingsongLee
+1 to lucas.wu

Best,
Jingsong Lee


--
From:lucas.wu 
Send Time:2019年12月9日(星期一) 11:39
To:user-zh 
Subject:Re: Flink RetractStream如何转成AppendStream?

可以使用类似的方式
//   val sstream = result4.toRetractStream[Row],filter(_.1==trye).map(_._2)
//   val result5 = tEnv.fromDataStream(sstream)
//   result5.toAppendStream[Row].print()


原始邮件
发件人:Jark wuimj...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2019年12月8日(周日) 11:53
主题:Re: Flink RetractStream如何转成AppendStream?


Hi, 目前 Kafka 只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持 
RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。 Best, Jark On Sun, 8 Dec 
2019 at 10:08, 陈帅 casel.c...@gmail.com wrote:  在用Flink做实时数仓时遇到group 
by统计后需要将结果发到kafka,但是现在的kafka   
sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?

Re: Flink RetractStream如何转成AppendStream?

2019-12-08 文章 JingsongLee
Hi 帅,

你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。

Best,
Jingsong Lee


--
From:Jark Wu 
Send Time:2019年12月8日(星期日) 11:54
To:user-zh 
Subject:Re: Flink RetractStream如何转成AppendStream?

Hi,

目前 Kafka  只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持
RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。

Best,
Jark

On Sun, 8 Dec 2019 at 10:08, 陈帅  wrote:

> 在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka
>
> sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?
>


Re: DML去重,translate时报错

2019-11-21 文章 JingsongLee
Hi 叶贤勋:

现在去重现在支持insert into select 语法。
问题在于你的这个SQL怎么没产出UniqueKey
这里面可能有blink-planner的bug。
CC: @Jark Wu @godfrey he (JIRA)

Best,
Jingsong Lee


--
From:叶贤勋 
Send Time:2019年11月21日(星期四) 16:20
To:user-zh@flink.apache.org 
Subject:DML去重,translate时报错

Hi 大家好:
Flink版本1.9.0,
SQL1:
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
SQL2:

CREATE TABLE user_dist (
dt VARCHAR,
user_id VARCHAR,
behavior VARCHAR
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
'connector.table' = 'user_behavior_dup',
'connector.username' = 'root',
'connector.password' = ‘**',
'connector.write.flush.max-rows' = '1'
);
SQL3:

INSERT INTO user_dist
SELECT
  dt,
  user_id,
  behavior
FROM (
   SELECT
  dt,
  user_id,
  behavior,
 ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc desc ) 
AS rownum
   FROM (select DATE_FORMAT(ts, '-MM-dd HH:00') as 
dt,user_id,behavior,PROCTIME() as proc
from user_log) )
WHERE rownum = 1;


在对SQL3执行tableEnv.sqlUpdate(SQL3)时,报错:
Exception in thread "main" org.apache.flink.table.api.TableException: 
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)


请问去重现在不支持insert into select 语法吗?


| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制




Re: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

2019-09-05 文章 JingsongLee
override getResultType方法,返回Types.SQL_TIMESTAMP.
这样应该可以绕过。
1.10会修复这个问题。

Best,
Jingsong Lee


--
From:守护 <346531...@qq.com>
Send Time:2019年9月5日(星期四) 12:11
To:user-zh@flink.apache.org JingsongLee ; user-zh 

Subject:回复: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

在哪声明DataType,这个要引入什么包吗,求指点,我的udf代码如下:

import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;


public class UTC2Local extends ScalarFunction {
public Timestamp eval(Timestamp s) {
long timestamp = s.getTime() + 2880;
return new Timestamp(timestamp);
}

}



-- 原始邮件 --
发件人: "JingsongLee";
发送时间: 2019年9月5日(星期四) 中午11:55
收件人: "user-zh";
主题:  Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

你声明了DataType吗?代码怎么写的?
由于目前只支持精度<=3,所以你得用DataTypes.TIMESTAMP(3)来表示。

Best,
Jingsong Lee


--
From:守护 <346531...@qq.com>
Send Time:2019年9月5日(星期四) 11:48
To:user-zh 
Subject:flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

社区的各位大佬好:


使用场景:flink1.9版本使用flinkSQL创建udf函数使用没有问,当切换到blinkSQL使用这个udf就会报错TIMESTAMP类型错误,udf实现的功能也很简单,就是将时间+8小时,报错信息如下


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Not support dataType: TIMESTAMP(9)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at 
java.security.AccessController.doPrivileged(Native Method)
at 
javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Not support dataType: TIMESTAMP(9)
at 
org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType(DataFormatConverters.java:248)
at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.isConverterIdentity(CodeGenUtils.scala:661)
at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:669)
at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:665)
at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternalIfNeeded(CodeGenUtils.scala:687)
at 
org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:79)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:737)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:451)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
at 
org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)

Re: 回复: 关于Flink SQL DISTINCT问题

2019-09-04 文章 JingsongLee
一般是按时间(比如天)来group by,state配置了超时过期的时间。
基本的去重方式就是靠state(比如RocksDbState)。
有mini-batch来减少对state的访问。

如果有倾斜,那是解倾斜问题的话题了。

Best,
Jingsong Lee


--
From:lvwenyuan 
Send Time:2019年9月4日(星期三) 15:11
To:user-zh 
Subject:Re:回复: 关于Flink SQL DISTINCT问题

对,肯定是按照窗口去重的。我就想问下,窗口去重时,所采用的方式
在 2019-09-04 14:38:29,"athlon...@gmail.com"  写道:
>在窗口内去重吧,不可能无限保留去重数据的
>
>
>
>athlon...@gmail.com
> 
>发件人: lvwenyuan
>发送时间: 2019-09-04 14:28
>收件人: user-zh
>主题: 关于Flink SQL DISTINCT问题
>各位大佬好:
>   我想问下,关于flink sql的实时去重,就是count(distinct user_id) 
> 。就是Flink内部是如何做到实时去重,如果对于数据量比较大的时候实时去重,是否会有性能问题。用的Blink Planner


Re: Flink SQL 时间问题

2019-09-03 文章 JingsongLee
Hi:
1.是的,目前只能是UTC,如果你有计算要求,你可以考虑改变的业务的窗口时间。
2.支持long的,你输入是不是int才会报错的,具体报错的信息?

Best,
Jingsong Lee


--
From:hb <343122...@163.com>
Send Time:2019年9月3日(星期二) 10:44
To:user-zh 
Subject:Flink SQL 时间问题

使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table


```
  ...


  schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()


  schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
  new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000)
)
```


问题1.  生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区.
问题2.  eventTime 事件时间字段怎么支持Long类型.


我输入到kafka记录为 {"eventTime": 10, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题

Re: [Discuss] What should the "Data Source" be translated into Chinese

2019-08-13 文章 JingsongLee
可以直接保留不用翻译吗?

Best,
Jingsong Lee


--
From:WangHengwei 
Send Time:2019年8月13日(星期二) 11:50
To:user-zh 
Subject:[Discuss] What should the "Data Source" be translated into Chinese

Hi all,


I'm working on [FLINK-13405] Translate "Basic API Concepts" page into 
Chinese. I have a problem. 

Usually we translate "Data Source" into "数据源" but there is no agreed 
translation for "Data Sink". Since it often appears in documents, I think we'd 
better to have a unified translation. I have some alternatives, e.g. 
"数据沉淀","数据归" or "数据终".

 Committer Xingcan Cui has a good suggestion for "数据汇" which corresponds to 
source ("数据源"). I asked Committer Jark Wu, he is also fine with it. I think 
"数据汇" is a good representation of flow charactiristics so I would like to use 
it. 


I want to hear more thoughts from the community whether we should translate 
it and what it should be translated into.


Thanks,
WangHW