回复:flink 1.13.0 中cumulate window 使用

2021-05-27 文章 范瑞
Hello
请参考官网案例,需要使用 window tvf语法。


https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-tvf/#cumulate


Best
fanrui



---原始邮件---
发件人: "邹云鹤"

回复: flink 1.13.0 中cumulate window 使用

2021-05-27 文章 邹云鹤
好的,我再研究下。


| |
邹云鹤
|
|
kevinyu...@163.com
|
签名由网易邮箱大师定制
在2021年5月28日 11:51,Leonard Xu 写道:
Hi,

Cumulate window 是基于 window TVF 语法的,和之前的 groupWindowFunction 不一样, 你可以参考 [1]
Window TVF 也支持了 tumble window, hop window, 并且性能更优,session window 预计会在1.14支持, 
session window 有需要可以使用老的语法。

Best,
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-agg/#windowing-tvfs


在 2021年5月28日,11:43,邹云鹤  写道:

insert into `test_out` select a.uid, 'dt', max(a.age) from `test_in` as a group 
by uid, cumulate(PROCTIME(), interval '1' minute, interval '1' hour);


hello, 各位大佬, flink 1.13.0 中流式作业中该怎么使用cumulate window呢?我用上面的SQL 貌似是不行, 
有没有使用过的大佬给点建议?




| |
邹云鹤
|
|
kevinyu...@163.com
|
签名由网易邮箱大师定制


Re: flink 1.13.0 中cumulate window 使用

2021-05-27 文章 Leonard Xu
Hi,

Cumulate window 是基于 window TVF 语法的,和之前的 groupWindowFunction 不一样, 你可以参考 [1]
Window TVF 也支持了 tumble window, hop window, 并且性能更优,session window 预计会在1.14支持, 
session window 有需要可以使用老的语法。

Best,
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-agg/#windowing-tvfs


> 在 2021年5月28日,11:43,邹云鹤  写道:
> 
> insert into `test_out` select a.uid, 'dt', max(a.age) from `test_in` as a 
> group by uid, cumulate(PROCTIME(), interval '1' minute, interval '1' hour);
> 
> 
> hello, 各位大佬, flink 1.13.0 中流式作业中该怎么使用cumulate window呢?我用上面的SQL 貌似是不行, 
> 有没有使用过的大佬给点建议?
> 
> 
> 
> 
> | |
> 邹云鹤
> |
> |
> kevinyu...@163.com
> |
> 签名由网易邮箱大师定制



flink 1.13.0 中cumulate window 使用

2021-05-27 文章 邹云鹤
insert into `test_out` select a.uid, 'dt', max(a.age) from `test_in` as a group 
by uid, cumulate(PROCTIME(), interval '1' minute, interval '1' hour);


hello, 各位大佬, flink 1.13.0 中流式作业中该怎么使用cumulate window呢?我用上面的SQL 貌似是不行, 
有没有使用过的大佬给点建议?




| |
邹云鹤
|
|
kevinyu...@163.com
|
签名由网易邮箱大师定制

flink hive batch作业报FileNotFoundException

2021-05-27 文章 bowen li
Hi,大家好
我执行的是batch table写入hive时,会出现FileNotFound的错误,找不到.staging文件
   版本是 1.12.1 搭建方式是 standalone
   报错信息如下:
   
   11:28
Caused by: java.lang.Exception: Failed to finalize execution on master
  ... 33 more
Caused by: org.apache.flink.table.api.TableException: Exception in 
finalizeGlobal
  at 
org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:94)
  at 
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:148)
  at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1368)
  ... 32 more
Caused by: java.io.FileNotFoundException: File 
hdfs://nameservice1/user/hive/warehouse/flinkdb.db/ods_csc_zcdrpf_test_utf8/.staging_1622171066985
 does not exist.
  at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:986)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:122)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1046)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1043)
  at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1053)
  at 
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170)
  at 
org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:137)
  at 
org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:93)
  at 
org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:92)
  ... 34 more

Re: flink 维表

2021-05-27 文章 Leonard Xu
Hi


> 1, 这个时态表必须是changlog流吗, 就是debezium - kafka 这样, 用和主表的flink cdc不可以吗, 
> 我用flink cdc测试没成功,因为flink cdc的维表好像不能指定watermark?

我们常说的 lookup维表也是时态表的一种,lookup 正如这个单词字面意思一样,主流的每条数据到来时都按 lookup key 去外部DB中 查询 
一条记录,这张维表自然就是最新的维表,这就是 proctime temporal join 的语义。

基于 Event-time temporal join,是按照 event-time 去取对应的版本, 与 proctime temporal 
join的区别是, proctime temporal join 只能取最新版本,Evet-time temporal join可以取该 event-time 
对应的版本。
Flink 声明 时态表主只需要两个属性, 主键 和 event time, 其中主键甚至可以是推导的。可以看下这个文档【1】

> 2, 订单表和时态表都需要一直写入数据来触发watermark吗? 
是的,event-time temporal join 靠两条流的 watermark 来共同驱动。


祝好,
Leonard
【1】https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/dev/table/concepts/versioned_tables/#%e6%97%b6%e6%80%81%e8%a1%a8

> 
> 烦请解答下
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2021年5月27日(星期四) 下午5:14
> 收件人:"user-zh" 
> 主题:Re: flink 维表
> 
> 
> 
> HI,
> 可以修改的,修改后不需要重启作业。
> 修改后 flink 事实流 是能立即 查询到 最新的维表数据(前提是维表查询出来的数据没有开启cache)。
> 你可以理解下 lookup 维表的语法: A join B for system time as of A.proctime on A.id 
>  
> 
> 祝好,
> Leonard
> 
>  在 2021年5月27日,16:35,liujian <13597820...@qq.com 写道:
>  
>  请问flink lookup表作为维表,那么lookup表是不是不能新增或者修改,如果修改了那么作业就需要重启?
>  想要作业不重启咋操作



flink1.13 通过sql cli执行hdfs上面的SQL文件

2021-05-27 文章 guozhi mang
各位好,
我在flink1.13文档中看到,sql cli已经可以通过 -f 参数执行SQL文件。

但是我试图通过 bin/sql-client.sh -f hdfs:///user/username/test.sql去执行时
报以下错误:


Exception in thread "main"
org.apache.flink.table.client.SqlClientException: Unexpected exception.
This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: org.apache.flink.util.WrappingRuntimeException:
org.apache.flink.util.DynamicCodeLoadingException: Cannot find configured
state backend factory class: hdfs
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:913)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:854)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:247)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:216)
at
org.apache.flink.table.client.gateway.context.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:185)
at
org.apache.flink.table.client.gateway.context.ExecutionContext.createTableEnvironment(ExecutionContext.java:112)
at
org.apache.flink.table.client.gateway.context.ExecutionContext.(ExecutionContext.java:68)
at
org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:231)
at
org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:100)
at
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:91)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
... 1 more
Caused by: org.apache.flink.util.DynamicCodeLoadingException: Cannot find
configured state backend factory class: hdfs
at
org.apache.flink.runtime.state.StateBackendLoader.loadUnwrappedStateBackendFromConfig(StateBackendLoader.java:173)
at
org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:220)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:911)
... 12 more
Caused by: java.lang.ClassNotFoundException: hdfs
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.flink.runtime.state.StateBackendLoader.loadUnwrappedStateBackendFromConfig(StateBackendLoader.java:168)
... 14 more

从错误中来看的话,像是缺失对hdfs的支持导致的,想请教一下各位,如何解决该问题。

需要导哪些jar包。


Re: flink 1.12.2 编译报错

2021-05-27 文章 Zhiwen Sun
谢谢,看了下,junit 的确依赖 org.hamcrest ,而且相关版本都没问题。

那这个报错的原因是什么呢? 什么地方导致 hamcrest 被 exclude 了?然后手动增加了dependency 就好了? 代码拉下来没修改过。

Zhiwen Sun



On Fri, May 28, 2021 at 10:58 AM Shuo Cheng  wrote:

> Hi, org.hamcrest 是 junit 的依赖
>
> On Fri, May 28, 2021 at 10:28 AM Zhiwen Sun  wrote:
>
> > 才编译到 Test utils : Junit 模块,就报错了
> >
> > maven 版本: 3.2.5
> > jdk 版本:1.8.0_251
> > flink 版本: flink 1.12.2
> > 执行的命令:mvn clean install -DskipTests -Dfast
> >
> > 错误信息:
> >
> > [ERROR] COMPILATION ERROR :
> > [INFO] -
> > [ERROR]
> >
> >
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27]
> > package org.hamcrest does not exist
> > [ERROR]
> >
> >
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,1]
> > static import only from classes and interfaces
> > [ERROR]
> >
> >
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[39,27]
> > package org.hamcrest does not exist
> >
> > [INFO] Reactor Summary:
> > [INFO]
> > [INFO] Flink : Tools : Force Shading .. SUCCESS [
> >  1.042 s]
> > [INFO] Flink :  SUCCESS [
> >  1.404 s]
> > [INFO] Flink : Annotations  SUCCESS [
> >  0.735 s]
> > [INFO] Flink : Test utils : ... SUCCESS [
> >  0.042 s]
> > [INFO] Flink : Test utils : Junit . FAILURE [
> >  0.283 s]
> >
> >
> > 看起来是缺少 org.hamcrest  相关依赖
> > 我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml
> > 的确没加 org.hamcrest 相关依赖, 不知道这个是怎么工作的。
> >
> > 请问大家下,原因是什么呢?
> >
> >
> > Zhiwen Sun
> >
>


Re: flink 1.12.2 编译报错

2021-05-27 文章 Shuo Cheng
Hi, org.hamcrest 是 junit 的依赖

On Fri, May 28, 2021 at 10:28 AM Zhiwen Sun  wrote:

> 才编译到 Test utils : Junit 模块,就报错了
>
> maven 版本: 3.2.5
> jdk 版本:1.8.0_251
> flink 版本: flink 1.12.2
> 执行的命令:mvn clean install -DskipTests -Dfast
>
> 错误信息:
>
> [ERROR] COMPILATION ERROR :
> [INFO] -
> [ERROR]
>
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27]
> package org.hamcrest does not exist
> [ERROR]
>
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,1]
> static import only from classes and interfaces
> [ERROR]
>
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[39,27]
> package org.hamcrest does not exist
>
> [INFO] Reactor Summary:
> [INFO]
> [INFO] Flink : Tools : Force Shading .. SUCCESS [
>  1.042 s]
> [INFO] Flink :  SUCCESS [
>  1.404 s]
> [INFO] Flink : Annotations  SUCCESS [
>  0.735 s]
> [INFO] Flink : Test utils : ... SUCCESS [
>  0.042 s]
> [INFO] Flink : Test utils : Junit . FAILURE [
>  0.283 s]
>
>
> 看起来是缺少 org.hamcrest  相关依赖
> 我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml
> 的确没加 org.hamcrest 相关依赖, 不知道这个是怎么工作的。
>
> 请问大家下,原因是什么呢?
>
>
> Zhiwen Sun
>


Re: flink 1.12.2 编译报错

2021-05-27 文章 Zhiwen Sun
在我手动加上依赖后,这个模块,能编译通过了,但 runtime 又失败了。

INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) @
flink-runtime_2.11 ---
[INFO] /data/flink-release-1.12.2/flink-runtime/src/main/java:-1: info:
compiling
[INFO] /data/flink-release-1.12.2/flink-runtime/src/main/scala:-1: info:
compiling
[INFO] Compiling 1958 source files to
/data/flink-release-1.12.2/flink-runtime/target/classes at 1622169188312
[ERROR] java.lang.NoClassDefFoundError: scala/reflect/internal/Trees
[INFO] at java.lang.ClassLoader.defineClass1(Native Method)
[INFO] at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
[INFO] at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
[INFO] at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
[INFO] at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
[INFO] at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
[INFO] at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
[INFO] at java.security.AccessController.doPrivileged(Native Method)
[INFO] at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
[INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
[INFO] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
[INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
[INFO] at java.lang.ClassLoader.defineClass1(Native Method)
[INFO] at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
[INFO] at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
[INFO] at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
[INFO] at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
[INFO] at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
[INFO] at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
[INFO] at java.security.AccessController.doPrivileged(Native Method)
[INFO] at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
[INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
[INFO] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
[INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
[INFO] at java.lang.Class.getDeclaredMethods0(Native Method)
[INFO] at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
[INFO] at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
[INFO] at java.lang.Class.getMethod0(Class.java:3018)
[INFO] at java.lang.Class.getMethod(Class.java:1784)
[INFO] at scala_maven_executions.MainHelper.runMain(MainHelper.java:155)
[INFO] at
scala_maven_executions.MainWithArgsInFile.main(MainWithArgsInFile.java:26)
[INFO] Caused by: java.lang.ClassNotFoundException:
scala.reflect.internal.Trees
[INFO] at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
[INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
[INFO] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
[INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
[INFO] ... 31 more
[INFO]

[INFO] Reactor Summary:
[INFO]
[INFO] Flink : Tools : Force Shading .. SUCCESS [
 1.093 s]
[INFO] Flink :  SUCCESS [
 1.168 s]
[INFO] Flink : Annotations  SUCCESS [
 1.218 s]
[INFO] Flink : Test utils : ... SUCCESS [
 0.047 s]
[INFO] Flink : Test utils : Junit . SUCCESS [
 0.795 s]
[INFO] Flink : Metrics : .. SUCCESS [
 0.037 s]
[INFO] Flink : Metrics : Core . SUCCESS [
 0.501 s]
[INFO] Flink : Core ... SUCCESS [
17.510 s]
[INFO] Flink : Java ... SUCCESS [
 2.467 s]
[INFO] Flink : Queryable state : .. SUCCESS [
 0.029 s]
[INFO] Flink : Queryable state : Client Java .. SUCCESS [
 3.106 s]
[INFO] Flink : FileSystems : .. SUCCESS [
 0.031 s]
[INFO] Flink : FileSystems : Hadoop FS  SUCCESS [
 3.457 s]
[INFO] Flink : Runtime  FAILURE [
18.086 s]


然后我修改 scala 的版本为 2.12
相关命令: mvn clean install -DskipTests -Dfast -Dscala-2.12
目前能够正常编译了,我看 release 版本支持 scala-2.11 的。是我的环境有问题吗?

Zhiwen Sun



On Fri, May 28, 2021 at 10:28 AM Zhiwen Sun  wrote:

> 才编译到 Test utils : Junit 模块,就报错了
>
> maven 版本: 3.2.5
> jdk 版本:1.8.0_251
> flink 版本: flink 1.12.2
> 执行的命令:mvn clean install -DskipTests -Dfast
>
> 错误信息:
>
> [ERROR] COMPILATION ERROR :
> [INFO] -
> [ERROR]
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27]
> package org.hamcrest does not exist
> [ERROR]
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,1]
> static import 

?????? flink ????

2021-05-27 文章 liujian
Hi:
??Leonard,??flink cdc??(??), join ??(debezium - 
kafka), eventTime , ??eventTime 
lookupeventTime
1, changlog, debezium - kafka , 
??flink cdc, flink cdc??,flink 
cdc??watermark?
2, ??watermark??? 


??


----
??: 
   "user-zh"

http://a.id/; = B.id 

flink 1.12.2 编译报错

2021-05-27 文章 Zhiwen Sun
编译到 Test utils : Junit 模块,就报错了

maven 版本: 3.2.5
jdk 版本:1.8.0_251
flink 版本: flink 1.12.2
执行的命令:mvn clean install -DskipTests -Dfast

错误信息:

[ERROR] COMPILATION ERROR :
[INFO] -
[ERROR]
/data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27]
package org.hamcrest does not exist
[ERROR]
/data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,1]
static import only from classes and interfaces
[ERROR]
/data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[39,27]
package org.hamcrest does not exist

[INFO] Reactor Summary:
[INFO]
[INFO] Flink : Tools : Force Shading .. SUCCESS [
 1.042 s]
[INFO] Flink :  SUCCESS [
 1.404 s]
[INFO] Flink : Annotations  SUCCESS [
 0.735 s]
[INFO] Flink : Test utils : ... SUCCESS [
 0.042 s]
[INFO] Flink : Test utils : Junit . FAILURE [
 0.283 s]


而且我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml
的确没加 org.hamcrest 相关依赖啊。

请问大家下,原因是什么呢?


Zhiwen Sun


Re: createTemporaryView接口注册table时,fieldname支持中横线(-)

2021-05-27 文章 Qishang
Hi Jun Zou.
tEnv.createTemporaryView("`Word-Count`", input, $("word"), $("frequency"));
加上 ` 试一下。

Jun Zou  于2021年5月26日周三 下午4:33写道:

> Hi,all:
> 我使用flink1.11.2进行作业开发,由于要兼容内部历史代码,需要把source手动注册成一张表,调用为:
>
> > tableEnv.createTemporaryView(tableSource.getName, source, fields.toArray:
> > _*)
> >
> 其中,tableEnv为 StreamTableEnvironment类型,source是 DataStream[Row] 类型,代表source
> connector生成的算子,fields是 由处理过的source table的 filed name 转换成的 Expression,将filed
> name转换成expression 使用 *ExpressionParser.parseExpression* 这个方法
>
> 正常情况下,都能注册成功。
> 但是,当field name带中横线,如 source中一个字段名称为
> “X-Oem”时,经过 ExpressionParser.parseExpression 会被解析为 “minus(X, Model)”
> 而非预期的“X-Oem”,导致注册成的表与DML语句中操作的字段名不一致报错。
>
> 有什么方法能够处理这种情况么?
>


Re: 流与流 left join

2021-05-27 文章 Shuo Cheng
我理解双流 Join 就能满足需求吧, 缺点是数据全量放 state,只能靠 state ttl 来清理数据

On 5/27/21, chenchencc <1353637...@qq.com> wrote:
> 想问下cep sql批处理能使用吗?想流批一体的。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink 1.12.2 编译报错

2021-05-27 文章 Zhiwen Sun
才编译到 Test utils : Junit 模块,就报错了

maven 版本: 3.2.5
jdk 版本:1.8.0_251
flink 版本: flink 1.12.2
执行的命令:mvn clean install -DskipTests -Dfast

错误信息:

[ERROR] COMPILATION ERROR :
[INFO] -
[ERROR]
/data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27]
package org.hamcrest does not exist
[ERROR]
/data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,1]
static import only from classes and interfaces
[ERROR]
/data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[39,27]
package org.hamcrest does not exist

[INFO] Reactor Summary:
[INFO]
[INFO] Flink : Tools : Force Shading .. SUCCESS [
 1.042 s]
[INFO] Flink :  SUCCESS [
 1.404 s]
[INFO] Flink : Annotations  SUCCESS [
 0.735 s]
[INFO] Flink : Test utils : ... SUCCESS [
 0.042 s]
[INFO] Flink : Test utils : Junit . FAILURE [
 0.283 s]


看起来是缺少 org.hamcrest  相关依赖
我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml
的确没加 org.hamcrest 相关依赖, 不知道这个是怎么工作的。

请问大家下,原因是什么呢?


Zhiwen Sun


Re: 流与流 left join

2021-05-27 文章 chenchencc
想问下cep sql批处理能使用吗?想流批一体的。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 流与流 left join

2021-05-27 文章 chenchencc
好的,谢谢。
想问下如果使用普通sql有啥其他方式吗。如果定义版本维度表可以吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: rocksdb状态后端最多保留checkpoints问题

2021-05-27 文章 tison
rocksdb 增量 checkpoint 不是你这么理解的,总的不会恢复不了。原因可以参考下面的材料

-
https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
官方 blog 介绍
- https://www.bilibili.com/video/BV1db411e7x2 施博士的介绍,大概 24 分钟开始讲

Best,
tison.


casel.chen  于2021年5月27日周四 下午11:35写道:

> 作业中使用了增量式的rocksdb状态后端,请问如果再设置了最多保留checkpoints个数的话,会不会造成rocksdb
> state恢复失败?例如,假设完整的state恢复需要最近10个chk,但因为设置了最多保留checkpoints个数为5的话,状态是不是恢复不了了?


rocksdb状态后端最多保留checkpoints问题

2021-05-27 文章 casel.chen
作业中使用了增量式的rocksdb状态后端,请问如果再设置了最多保留checkpoints个数的话,会不会造成rocksdb 
state恢复失败?例如,假设完整的state恢复需要最近10个chk,但因为设置了最多保留checkpoints个数为5的话,状态是不是恢复不了了?

如何根据flink sql解析出作业的血缘关系?

2021-05-27 文章 casel.chen
如何根据flink sql解析出作业的血缘关系?找到类似这样的血缘关系:source table A --> lookup table B --> sink 
table C

Re:Re: flink sql cdc并行度问题

2021-05-27 文章 casel.chen
我的作业是用flink sql消费mysql cdc 
binlog并实时同步到mongodb。如果只开一个并行度的话,mongodb的写入速度可能追不上mysql的写入。所以我需要在sink端开大并行度。
我不清楚用sql怎么写keyBy,是不是要group by 
pk呢?我原来的想法是在MongoDBSinkFunction中开一个线程池,每个线程对应下游sink的一个并行度,每个线程带一个queue,MongoDBSinkFunction根据数据PK往对应的queue发数据,每个消费者线程从自己的queue
 pull数据再进行批量插入。不知道这样可不可行?

















在 2021-05-26 14:22:11,"Zorro"  写道:
>mysql-cdc connector只能设置一个并行度,主要可能有这些原因:
>1. mysql binlog本质上是一个文件,多个并行度消费需要避免重复
>2. 多个并行度消费难以保证顺序
>
>sink可以设为多个并行度,但是顺序不一定,如果需要主键相同的记录发到同一个sink线程可以先做一个keyby,并且保证keyby并行度与sink并行度相同,这样基本上能够保证数据forward传输,不过也不能100%保证有序。
>
>如果需要保证有序还是建议sink并行度为1
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 流与流 left join

2021-05-27 文章 JasonLee
Hi
可以看下 interval join 是否能满足你的需求



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink sql 执行计划

2021-05-27 文章 流弊
想问下有啥方式能看到flink table的状态保存时间范围吗?

Re: 流与流 left join

2021-05-27 文章 jinghuiwang
sql可以定义cep,捕获匹配流和超时流

发自我的iPhone

> 在 2021年5月27日,下午7:40,流弊 <1353637...@qq.com> 写道:
> 
> 我们现在场景有两个流join,A流left joinB流,A流比B流晚到,而且最晚比B流晚到3小时,用flink sql有啥方法实现吗?


流与流 left join

2021-05-27 文章 流弊
我们现在场景有两个流join,A流left joinB流,A流比B流晚到,而且最晚比B流晚到3小时,用flink sql有啥方法实现吗?

flink 1.12 通过rest api 提交任务时 SupervisorActor block

2021-05-27 文章 seuzxc



各位好: 
  我这边通过rest api 向jobmanager提交任务时,出现jobmaster
对应的actor创建过程被block住了,看源代码这块的逻辑也就只是注册个actor,(对akka内部逻辑不是很了解)。不知道各位大佬是否有碰到过类似问题(block
1个小时左右又自动正常了),对应block线程信息如下: 

cluster-io-thread-14 BLOCKED blocked on java.lang.Object@67bb821e owned by
"cluster-io-thread-16" Id=218 at
org.apache.flink.runtime.rpc.akka.AkkaRpcService.registerAkkaRpcActor
(AkkaRpcService.java:308) at
org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer
(AkkaRpcService.java:227) at org.apache.flink.runtime.rpc.RpcEndpoint.
(RpcEndpoint.java:127) at org.apache.flink.runtime.rpc.FencedRpcEndpoint.
(FencedRpcEndpoint.java:48) at org.apache.flink.runtime.jobmaster.JobMaster.
(JobMaster.java:235) at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService
(DefaultJobMasterServiceFactory.java:95) at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService
(DefaultJobMasterServiceFactory.java:39) at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.
(JobManagerRunnerImpl.java:162) at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner
(DefaultJobManagerRunnerFactory.java:86) at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5
(Dispatcher.java:478) at
org.apache.flink.runtime.dispatcher.Dispatcher$$Lambda$796/1707020487.get
(flink-akka.actor.default-dispatcher-4684) at
java.util.concurrent.CompletableFuture$AsyncSupply.run
(CompletableFuture.java:1604) at
java.util.concurrent.ThreadPoolExecutor.runWorker
(ThreadPoolExecutor.java:1149) at
java.util.concurrent.ThreadPoolExecutor$Worker.run
(ThreadPoolExecutor.java:624) at java.lang.Thread.run 




cluster-io-thread-16 WAITING waiting on
java.util.concurrent.CompletableFuture$Signaller@60dd3bae at
sun.misc.Unsafe.park (Native Method) at
java.util.concurrent.locks.LockSupport.park (LockSupport.java:175) at
java.util.concurrent.CompletableFuture$Signaller.block
(CompletableFuture.java:1707) at
java.util.concurrent.ForkJoinPool.managedBlock (ForkJoinPool.java:3323) at
java.util.concurrent.CompletableFuture.waitingGet
(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.join
(CompletableFuture.java:1947) at
org.apache.flink.runtime.rpc.akka.SupervisorActor.startAkkaRpcActor
(SupervisorActor.java:208) at
org.apache.flink.runtime.rpc.akka.AkkaRpcService.registerAkkaRpcActor
(AkkaRpcService.java:311) at
org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer
(AkkaRpcService.java:227) at org.apache.flink.runtime.rpc.RpcEndpoint.
(RpcEndpoint.java:127) at org.apache.flink.runtime.rpc.FencedRpcEndpoint.
(FencedRpcEndpoint.java:48) at org.apache.flink.runtime.jobmaster.JobMaster.
(JobMaster.java:235) at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService
(DefaultJobMasterServiceFactory.java:95) at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService
(DefaultJobMasterServiceFactory.java:39) at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.
(JobManagerRunnerImpl.java:162) at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner
(DefaultJobManagerRunnerFactory.java:86) at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5
(Dispatcher.java:478) at
org.apache.flink.runtime.dispatcher.Dispatcher$$Lambda$796/1707020487.get
(flink-akka.actor.default-dispatcher-4684) at
java.util.concurrent.CompletableFuture$AsyncSupply.run
(CompletableFuture.java:1604) at
java.util.concurrent.ThreadPoolExecutor.runWorker
(ThreadPoolExecutor.java:1149) at
java.util.concurrent.ThreadPoolExecutor$Worker.run
(ThreadPoolExecutor.java:624) at java.lang.Thread.run



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 1.12 通过rest api 提交任务时 SupervisorActor block

2021-05-27 文章 seuzxc
各位好: 我这边通过rest api 向jobmanager提交任务时,出现jobmaster
对应的actor创建过程被block住了,看源代码这块的逻辑也就只是注册个actor,(对akka内部逻辑不是很了解)。不知道各位大佬是否有碰到过类似问题(block
1个小时左右又自动正常了),对应block线程信息如下:cluster-io-thread-14BLOCKED blocked on
java.lang.Object@67bb821e owned by "cluster-io-thread-16" Id=218at
org.apache.flink.runtime.rpc.akka.AkkaRpcService.registerAkkaRpcActor
(AkkaRpcService.java:308)at
org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer
(AkkaRpcService.java:227)at org.apache.flink.runtime.rpc.RpcEndpoint.
(RpcEndpoint.java:127)at org.apache.flink.runtime.rpc.FencedRpcEndpoint.
(FencedRpcEndpoint.java:48)at org.apache.flink.runtime.jobmaster.JobMaster.
(JobMaster.java:235)at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService
(DefaultJobMasterServiceFactory.java:95)at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService
(DefaultJobMasterServiceFactory.java:39)at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.
(JobManagerRunnerImpl.java:162)at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner
(DefaultJobManagerRunnerFactory.java:86)at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5
(Dispatcher.java:478)at
org.apache.flink.runtime.dispatcher.Dispatcher$$Lambda$796/1707020487.get
(flink-akka.actor.default-dispatcher-4684)at
java.util.concurrent.CompletableFuture$AsyncSupply.run
(CompletableFuture.java:1604)at
java.util.concurrent.ThreadPoolExecutor.runWorker
(ThreadPoolExecutor.java:1149)at
java.util.concurrent.ThreadPoolExecutor$Worker.run
(ThreadPoolExecutor.java:624)at java.lang.Thread.runcluster-io-thread-16
WAITING waiting on
java.util.concurrent.CompletableFuture$Signaller@60dd3baeat
sun.misc.Unsafe.park(Native Method)at
java.util.concurrent.locks.LockSupport.park (LockSupport.java:175)at
java.util.concurrent.CompletableFuture$Signaller.block
(CompletableFuture.java:1707)at
java.util.concurrent.ForkJoinPool.managedBlock  (ForkJoinPool.java:3323)at
java.util.concurrent.CompletableFuture.waitingGet
(CompletableFuture.java:1742)at java.util.concurrent.CompletableFuture.join
(CompletableFuture.java:1947)at
org.apache.flink.runtime.rpc.akka.SupervisorActor.startAkkaRpcActor
(SupervisorActor.java:208)at
org.apache.flink.runtime.rpc.akka.AkkaRpcService.registerAkkaRpcActor
(AkkaRpcService.java:311)at
org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer
(AkkaRpcService.java:227)at org.apache.flink.runtime.rpc.RpcEndpoint.
(RpcEndpoint.java:127)at org.apache.flink.runtime.rpc.FencedRpcEndpoint.
(FencedRpcEndpoint.java:48)at org.apache.flink.runtime.jobmaster.JobMaster.
(JobMaster.java:235)at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService
(DefaultJobMasterServiceFactory.java:95)at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService
(DefaultJobMasterServiceFactory.java:39)at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.
(JobManagerRunnerImpl.java:162)at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner
(DefaultJobManagerRunnerFactory.java:86)at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5
(Dispatcher.java:478)at
org.apache.flink.runtime.dispatcher.Dispatcher$$Lambda$796/1707020487.get
(flink-akka.actor.default-dispatcher-4684)at
java.util.concurrent.CompletableFuture$AsyncSupply.run
(CompletableFuture.java:1604)at
java.util.concurrent.ThreadPoolExecutor.runWorker
(ThreadPoolExecutor.java:1149)at
java.util.concurrent.ThreadPoolExecutor$Worker.run
(ThreadPoolExecutor.java:624)at java.lang.Thread.run



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.12 通过rest api 提交任务时 SupervisorActor block

2021-05-27 文章 seuzxc
各位好:
 我这边flink 1.12运行一段时间后出现通过rest api提交任务 进行jobMaster
对应actor创建时被block,请问这种情况一般是什么原因呢?线程block信息如下(看源码这个位置只是向SupervisorActor发送消息注册一个jobmaster对应的actor,理论上不会有什么消耗资源的,但是我这线上环境有时会block一个小时然后恢复正常)

cluster-io-thread-14BLOCKED blocked on java.lang.Object@67bb821e owned by
"cluster-io-thread-16" Id=218
at org.apache.flink.runtime.rpc.akka.AkkaRpcService.registerAkkaRpcActor
(AkkaRpcService.java:308)
at org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer
(AkkaRpcService.java:227)
at org.apache.flink.runtime.rpc.RpcEndpoint.  (RpcEndpoint.java:127)
at org.apache.flink.runtime.rpc.FencedRpcEndpoint.
(FencedRpcEndpoint.java:48)
at org.apache.flink.runtime.jobmaster.JobMaster.  (JobMaster.java:235)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService
(DefaultJobMasterServiceFactory.java:95)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService
(DefaultJobMasterServiceFactory.java:39)
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.
(JobManagerRunnerImpl.java:162)
at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner
(DefaultJobManagerRunnerFactory.java:86)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5
(Dispatcher.java:478)
at org.apache.flink.runtime.dispatcher.Dispatcher$$Lambda$796/1707020487.get
(flink-akka.actor.default-dispatcher-4684)
at java.util.concurrent.CompletableFuture$AsyncSupply.run
(CompletableFuture.java:1604)
at java.util.concurrent.ThreadPoolExecutor.runWorker
(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run
(ThreadPoolExecutor.java:624)
at java.lang.Thread.run


cluster-io-thread-16WAITING waiting on
java.util.concurrent.CompletableFuture$Signaller@60dd3bae
at sun.misc.Unsafe.park (Native Method)
at java.util.concurrent.locks.LockSupport.park  (LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block
(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock   (ForkJoinPool.java:3323)
at java.util.concurrent.CompletableFuture.waitingGet
(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.join  (CompletableFuture.java:1947)
at org.apache.flink.runtime.rpc.akka.SupervisorActor.startAkkaRpcActor
(SupervisorActor.java:208)
at org.apache.flink.runtime.rpc.akka.AkkaRpcService.registerAkkaRpcActor
(AkkaRpcService.java:311)
at org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer
(AkkaRpcService.java:227)
at org.apache.flink.runtime.rpc.RpcEndpoint.  (RpcEndpoint.java:127)
at org.apache.flink.runtime.rpc.FencedRpcEndpoint.
(FencedRpcEndpoint.java:48)
at org.apache.flink.runtime.jobmaster.JobMaster.  (JobMaster.java:235)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService
(DefaultJobMasterServiceFactory.java:95)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService
(DefaultJobMasterServiceFactory.java:39)
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.
(JobManagerRunnerImpl.java:162)
at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner
(DefaultJobManagerRunnerFactory.java:86)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5
(Dispatcher.java:478)
at org.apache.flink.runtime.dispatcher.Dispatcher$$Lambda$796/1707020487.get
(flink-akka.actor.default-dispatcher-4684)
at java.util.concurrent.CompletableFuture$AsyncSupply.run
(CompletableFuture.java:1604)
at java.util.concurrent.ThreadPoolExecutor.runWorker
(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run
(ThreadPoolExecutor.java:624)
at java.lang.Thread.run













--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 维表

2021-05-27 文章 Leonard Xu
HI,
可以修改的,修改后不需要重启作业。
修改后 flink 事实流 是能立即 查询到 最新的维表数据(前提是维表查询出来的数据没有开启cache)。
你可以理解下 lookup 维表的语法: A join B for system time as of A.proctime on A.id 
 = B.id  就是 查询当前最新的维表(proctime代表了最新时间)并关联。


祝好,
Leonard

> 在 2021年5月27日,16:35,liujian <13597820...@qq.com> 写道:
> 
> 请问flink lookup表作为维表,那么lookup表是不是不能新增或者修改,如果修改了那么作业就需要重启?
> 想要作业不重启咋操作



Re: SQL DDL选项的值需要单引号应该如何解决?

2021-05-27 文章 zh0122
create table source (
id int
) with (
type='jdbc',
username='us',password='ab\'c'
);


Luna Wong  于2021年5月27日周四 下午4:02写道:

> create table source (
> id int
> ) with (
> type='jdbc',
> username='us',
> password='ab'c'
> );
>
> 例如上面DDL密码的值需要 ' 单引号。这种应该怎么解决?
>


flink ????

2021-05-27 文章 liujian
flink 
lookup??,lookup,?


SQL DDL选项的值需要单引号应该如何解决?

2021-05-27 文章 Luna Wong
create table source (
id int
) with (
type='jdbc',
username='us',
password='ab'c'
);

例如上面DDL密码的值需要 ' 单引号。这种应该怎么解决?