Re: 关于FlinkSQL的文档中flinksql的connector和catalog划分以及hive catalog和hive connector是否必须配合使用的问题。

2020-08-10 文章 Zhao,Yi(SEC)
如果使用了Hive catalog,我创建一个流式表,然后返回基于同一个HiveCatalog的spark-sql中,那个表能看到吗?如果尝试查询是不是会出错?
无法实验:我现在还没搞定,因为简单的配置ok,连接到了hive metastore,也通过 show 
tables看到了表,但select会出错(这个问题后续再说,现在就是想知道这种基于已有catalog的情况时是不是不太好,比较flink-sql特有流表)。

在 2020/8/10 下午8:24,“Danny Chan” 写入:

你好 ~

1. 你是只文档结构吗 ?catalog 是 flink SQL 管理表元数据信息的组件,通过注册 catalog 用户可以直接访问 catalog 
中的已存表,当然用户也可以通过 CREATE TABLE DDL 来创建对应的 connector 表
2. 访问 hive metastore 中的表示一定要用 hive catalog 的,如果是新建临时表(不持久化),也可以使用内置的 catalog

Best,
Danny Chan
在 2020年8月10日 +0800 PM8:14,Zhao,Yi(SEC) ,写道:
> 1 为什么flinksql 1.11中,JDBC 
Catalog通过简单的链接转给了connector,catalog和connector并不是同一个概念。我认为应该将jdbc connectior和jdbc 
catalog分开放入各自目录。
>
> 2 为什么flinksql1.11中,connector部分没有hive connector。而是在hive 
integration部分,以及catalogs中介绍。而且在 Table API & SQL/Hive Integration/Hive Read & 
Write 部分,第一句是“Using the HiveCatalog and Flink’s connector to Hive, Flink can 
read and write from Hive data as an alternative to Hive’s batch 
engine.”。难道不使用hivecatalog就不能读取hive数据?是这个意思嘛。感觉好像意思是hive读取特别,这种情况必须使用hive 
catalog。不可以使用jdbc catalog,但使用hive connector嘛?




FlinkSQL even time 定义问题

2020-08-12 文章 Zhao,Yi(SEC)
咨询下,FlinkSQl的event time必须在DDL中定义吗。能否DDL只是定义普通数据字段,比如有个time属性。
然后在select 的时候指定具体使用的watermark策略。
目的:假设基于同一个表A,我查询1需要使用watermark为time-1min,查询2需要使用watermark为time-2min。

其次除了这种case,如果我基于表1查询得到结果输出到表2,那么表2的event 
time定义呢?比如在表2的定义中基于表2的某个属性(比如叫time2),然后插入表2的时候只要time2属性存在就可以?


此外,如果对比datastream api的watermark传播机制,如果我希望查询1结构输出到表2,然后继续基于表2查询。貌似就需要
select xxx from (select yyy from t); 这种嵌套写法,一句sql会变成一个任务。
那如何用sql做非常复杂的任务组合呢,比如我是2句不搭嘎的sql的,就希望在同一个job中呢。


关于FlinkSQL的一些最佳实践咨询

2020-08-12 文章 Zhao,Yi(SEC)
最近刚刚开始研究FlinkSQL,之前一直用的DataStream API,现在想尝试下FlinkSQL的玩法。
目前简单看下来,感觉相对来说相比DataStream API不够灵活,所以有一些实践疑问咨询下。

(1) 通常Kafka的动态表会通过临时表的方式写在程序开头声明定义呢?还是直接写在sql-client的配置文件中呢。

如果写在配置文件,好处是类似于一个固化的表定义一样,每次执行sql-client的cli之后可以马上查询相关数据。但是有个问题是这种情况下kafka设置读取earliest还是latest?貌似根据具体查询需求临时改配置好像也不是很方便。

写临时表是不是更常用?比如你这个程序目的就是读取指定时间开始的数据,那就在代码中定义这个临时表的时候就指定了offset。如果是需要从最早开始读,代码中定义临时表就定义为最早。

(2) 
FlinkSQL生产中,主要是通过代码方式执行呢?还是将相关jar都放好到固定的机器,然后每次sql-client的命令行进去,然后执行一些sql(提交任务)。

(3) 代码中,直接sql api常用呢?还是table api常用呢?哪个更推荐?包括表的定义也有sql ddl方式,以及table 
api提供的一套方式。以及DML也一样,table api提供了一套更加面向对象的语法,sql api则提供了更高层次的抽象,同时类sql(技能栈更通用)。

(4) Flink-json这种格式包一般打包到任务jar中?还是提前放到sql-client的相关lib中?哪个更推荐。还有各种connector。

今天有个实验,报错就是没这些包,加了lib就ok啦。这个有一点不是很懂,是这些包仅client需要,还是说我通过sql-client.sh 
的-l参数指定的lib目录会被动态上传到集群呢?毕竟这些包我并没有手动上传到集群,只是在client端游,并通过-l指定之后就ok可用了。

(5) 
在Hive中实践,更多的是提前定义好的各种表结构,然后策略同学直接写sql做各种数据查询和分析。在FlinkSQL场景下,是不是每个代码中定义自己的临时表更常见呢?毕竟有Kafka这种动态表,每次启动一个任务的开始消费位置可能也不一样。


Flink sql TUMBLE window 不支持offset吗

2020-08-12 文章 Zhao,Yi(SEC)
如题,是不是不支持offset呀。在中国,天窗口实际应该是 INTVERVAL ‘1’ DAY + 8小时offset。
但是看了文档没发现添加offset的语法。


如果仅仅是时间显示问题还好说,搞个函数转一转,但分窗这个不提供入口没办法。


如何设置FlinkSQL并行度

2020-08-12 文章 Zhao,Yi(SEC)
看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。
如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢?

比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。



FlinkSQL命令行方式提交sql任务的jar依赖问题

2020-08-12 文章 Zhao,Yi(SEC)
背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
现在比较混乱,哪些jar需要放到A,哪些放到B。


(1) kafka ssl 
证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。

(2) 
flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。



总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?

目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。




Re: 关于FlinkSQL的一些最佳实践咨询

2020-08-12 文章 Zhao,Yi(SEC)
没有小伙伴帮忙看下这个问题吗。可以优先看下第2、3、4个问题。

其他问题,第1/5个问题,我自己大概感觉,直接使用memory 
catalog就ok啦。我看了下FlinkSQL好像没有临时表的概念。10个SQL开10个sql-client的session,各个session独享自己的memory
 catalog,创建的表也类似于临时表的效果。

发件人: "Zhao,Yi(SEC)" 
日期: 2020年8月12日 星期三 下午2:20
收件人: "user-zh@flink.apache.org" 
主题: 关于FlinkSQL的一些最佳实践咨询

最近刚刚开始研究FlinkSQL,之前一直用的DataStream API,现在想尝试下FlinkSQL的玩法。
目前简单看下来,感觉相对来说相比DataStream API不够灵活,所以有一些实践疑问咨询下。

(1) 通常Kafka的动态表会通过临时表的方式写在程序开头声明定义呢?还是直接写在sql-client的配置文件中呢。

如果写在配置文件,好处是类似于一个固化的表定义一样,每次执行sql-client的cli之后可以马上查询相关数据。但是有个问题是这种情况下kafka设置读取earliest还是latest?貌似根据具体查询需求临时改配置好像也不是很方便。

写临时表是不是更常用?比如你这个程序目的就是读取指定时间开始的数据,那就在代码中定义这个临时表的时候就指定了offset。如果是需要从最早开始读,代码中定义临时表就定义为最早。

(2) 
FlinkSQL生产中,主要是通过代码方式执行呢?还是将相关jar都放好到固定的机器,然后每次sql-client的命令行进去,然后执行一些sql(提交任务)。

(3) 代码中,直接sql api常用呢?还是table api常用呢?哪个更推荐?包括表的定义也有sql ddl方式,以及table 
api提供的一套方式。以及DML也一样,table api提供了一套更加面向对象的语法,sql api则提供了更高层次的抽象,同时类sql(技能栈更通用)。

(4) Flink-json这种格式包一般打包到任务jar中?还是提前放到sql-client的相关lib中?哪个更推荐。还有各种connector。

今天有个实验,报错就是没这些包,加了lib就ok啦。这个有一点不是很懂,是这些包仅client需要,还是说我通过sql-client.sh 
的-l参数指定的lib目录会被动态上传到集群呢?毕竟这些包我并没有手动上传到集群,只是在client端有,并通过-l指定之后就ok可用了。

(5) 
在Hive中实践,更多的是提前定义好的各种表结构,然后策略同学直接写sql做各种数据查询和分析。在FlinkSQL场景下,是不是每个代码中定义自己的临时表更常见呢?毕竟有Kafka这种动态表,每次启动一个任务的开始消费位置可能也不一样。


Re: Flink sql TUMBLE window 不支持offset吗

2020-08-12 文章 Zhao,Yi(SEC)
大概懂你意思,sql情况下event time都是Timestamp数据类型,这是一种-MM-dd 
HH:mm:ss格式的,从这个角度讲的确是不需要考虑offset的问题(只要周期是1min/1h/1d等类似)。只需要这个Timestamp是我需要的时区格式的日期即可。
其实我是联系DatastreamAPI的,因为DatastreamAPI中是基于unix timestamp(时间戳)来划分的。

在 2020/8/12 下午9:21,“Benchao Li” 写入:

Hi,

目前还没有支持offset,有一个issue[1] 在跟进解决这个问题。
但是你这个case应该是不需要offset就可以的,现在划分窗口的逻辑是直接按照毫秒级别的时间戳来整除进行计算的。
所以如果窗口是1min/1h/1d 这种,都是整点的窗口,没有问题的。如果你有这个问题,那应该是你的时间字段有时区问题。
有一种case是需要offset的,比如一周这种窗口,因为1980-1-1是周四,所以一周的窗口是从周四开始的。

[1] https://issues.apache.org/jira/browse/FLINK-17767

Zhao,Yi(SEC)  于2020年8月12日周三 下午8:15写道:

> 如题,是不是不支持offset呀。在中国,天窗口实际应该是 INTVERVAL ‘1’ DAY + 8小时offset。
> 但是看了文档没发现添加offset的语法。
>
>
> 如果仅仅是时间显示问题还好说,搞个函数转一转,但分窗这个不提供入口没办法。
>


-- 

Best,
Benchao Li




Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

2020-08-13 文章 Zhao,Yi(SEC)
A是10机器集群(HA模式,独立集群),B作为提交机器。
从我实验效果来看,我是先启动一个sql-client的cli,如下命令:
./bin/sql-client.sh embedded -l $(pwd)/libs_sql -l $(pwd)/libs_udf
其中libs_sql中有:flink-connector-kafka_2.12-1.10.0.jar  
flink-connector-kafka-base_2.12-1.10.0.jar  flink-jdbc_2.12-1.10.0.jar  
flink-json-1.10.0.jar。然后A集群所有机器没有加这些包(Flink部署目录lib中没有)。A集群上其他任务提交的包中jar应该不致于会影响到我sql提交的任务。
结论是,我libs_sql中没有flink-json、flink-connector-kafka等的时候,提交sql任务会报错。加了的时候,提交sql任务不报错。
所以感觉貌似提交sql任务会将启动sql-client时候指定的lib相关包都上传吗?


在 2020/8/13 下午3:10,“Jeff Zhang” 写入:

你的10台机器是flink standalone还是 yarn集群 ?
如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。

另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
或者加入钉钉群讨论,钉钉群号: 32803524


Zhao,Yi(SEC)  于2020年8月13日周四 下午1:02写道:

> 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
> 现在比较混乱,哪些jar需要放到A,哪些放到B。
>
>
> (1) kafka ssl
> 
证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
>
> (2)
>  
flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
>
>
>
> 总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
>
> 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
>
>
>

-- 
Best Regards

Jeff Zhang




Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

2020-08-13 文章 Zhao,Yi(SEC)
补充:
刚刚翻了下源码,kafka那个直到原因了,见FlinkKafkaConsumer的288行,限定了必须是ByteArrayDeserializer,而且引用到了ByteArrayDeserializer类,这个是在new
 KafkaConsumer的过程就执行到的,所以这个依赖是提交端需要的。

按照  
的讲法,flink-sql按照-j或-l指定的包会被上传,这个倒也合理,毕竟有些任务特定需要一些包,提供这个功能肯定有用。
但像connector,json,csv这种非常通用的包感觉应该统一放入集群就好,但实际按照这个情况来看无法做到。
因为即使我把这些包统一放到了集群,实际提交段还是需要这些包,因为没有这些包提交sql时就直接报错了,于是还是需要通过-j或-l指定,然后进一步游会被上传?所以说,此处又涉及到一个flink集群上的包和sql-client提交的包重复的问题,一致还好,不一致情况下哪个优先呢?
___

在 2020/8/14 上午10:46,“Zhao,Yi(SEC)” 写入:

分析个报错,报错如下:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find 
a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
'format.type' expects 'csv', but is 'json'

The following properties are requested:
从报错来看,是需要json的format,但实际只有csv。因为缺少json的format。
这个实验,是我将相关所有jar都放到集群的flink的lib目录并重启了集群。
但是提交sql(即执行sql-client.sh命令)的机器上没有这些依赖,报错如上。

所以,这个根据我的表定义去找对应的format,以及connector等的过程是在提交端做的吗?

还有一个更奇怪的,就算format,connector相关是提交端做的,但是我kafka的ssl证书路径的读取理论上肯定应该是在任务执行时候才会做,但当我执行select
 * from xxx提交sql之后马上报错了,报错为:
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.serialization.ByteArrayDeserializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

Kafka的序列化类理论上是作为kafkasource被创建时候的properties传入,然后kafkaConsumer执行期间才会发现这个class不存在吧。


___
在 2020/8/14 上午9:44,“godfrey he” 写入:

sql client 中通过 -j 或者 -l 的指定的包会被随着job提交的时候一起上传到jm。

Zhao,Yi(SEC)  于2020年8月13日周四 下午5:11写道:

> A是10机器集群(HA模式,独立集群),B作为提交机器。
> 从我实验效果来看,我是先启动一个sql-client的cli,如下命令:
> ./bin/sql-client.sh embedded -l $(pwd)/libs_sql -l $(pwd)/libs_udf
> 其中libs_sql中有:flink-connector-kafka_2.12-1.10.0.jar
> flink-connector-kafka-base_2.12-1.10.0.jar  flink-jdbc_2.12-1.10.0.jar
> 
flink-json-1.10.0.jar。然后A集群所有机器没有加这些包(Flink部署目录lib中没有)。A集群上其他任务提交的包中jar应该不致于会影响到我sql提交的任务。
>
> 
结论是,我libs_sql中没有flink-json、flink-connector-kafka等的时候,提交sql任务会报错。加了的时候,提交sql任务不报错。
> 所以感觉貌似提交sql任务会将启动sql-client时候指定的lib相关包都上传吗?
> 
>
> 在 2020/8/13 下午3:10,“Jeff Zhang” 写入:
>
> 你的10台机器是flink standalone还是 yarn集群 ?
> 如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。
>
> 另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
> https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
> 或者加入钉钉群讨论,钉钉群号: 32803524
>
>
> Zhao,Yi(SEC)  于2020年8月13日周四 下午1:02写道:
>
> > 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
> > 现在比较混乱,哪些jar需要放到A,哪些放到B。
> >
> >
> > (1) kafka ssl
> >
> 
证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
> >
> > (2)
> >
> 
flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
> >
> >
> >
> >
> 
总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
> >
> > 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
> >
> >
> >
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>






Re: 回复:关于flink升级

2020-08-13 文章 Zhao,Yi(SEC)
经历了1.7到1.8,1.8到1.9,1.9到1.10;前2还好,最后一个有些坑,jdk8版本不要太旧,某个版本和1.10配合会有bug。

在 2020/8/14 上午9:25,“蒋佳成(Jiacheng Jiang)”<920334...@qq.com> 写入:


1.10有了新的内存模型,没弄清楚这些内存配置前,可能跑不起job!建议先弄清楚,在测试环境上先搞搞--原始邮件--
发件人:引领

Re: 如何设置FlinkSQL并行度

2020-08-13 文章 Zhao,Yi(SEC)
并行度问题有人帮忙解答下吗,此外补充个相关问题,除了并行度,flink-sql情况下,能做检查点/保存点,并基于检查点/保存点重启sql任务吗。

发件人: "Zhao,Yi(SEC)" 
日期: 2020年8月13日 星期四 上午11:44
收件人: "user-zh@flink.apache.org" 
主题: 如何设置FlinkSQL并行度

看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。
如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢?

比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。



Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

2020-08-13 文章 Zhao,Yi(SEC)
分析个报错,报错如下:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
'format.type' expects 'csv', but is 'json'

The following properties are requested:
从报错来看,是需要json的format,但实际只有csv。因为缺少json的format。
这个实验,是我将相关所有jar都放到集群的flink的lib目录并重启了集群。
但是提交sql(即执行sql-client.sh命令)的机器上没有这些依赖,报错如上。

所以,这个根据我的表定义去找对应的format,以及connector等的过程是在提交端做的吗?
还有一个更奇怪的,就算format,connector相关是提交端做的,但是我kafka的ssl证书路径的读取理论上肯定应该是在任务执行时候才会做,但当我执行select
 * from xxx提交sql之后马上报错了,报错为:
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.serialization.ByteArrayDeserializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
Kafka的序列化类理论上是作为kafkasource被创建时候的properties传入,然后kafkaConsumer执行期间才会发现这个class不存在吧。

___
在 2020/8/14 上午9:44,“godfrey he” 写入:

sql client 中通过 -j 或者 -l 的指定的包会被随着job提交的时候一起上传到jm。

Zhao,Yi(SEC)  于2020年8月13日周四 下午5:11写道:

> A是10机器集群(HA模式,独立集群),B作为提交机器。
> 从我实验效果来看,我是先启动一个sql-client的cli,如下命令:
> ./bin/sql-client.sh embedded -l $(pwd)/libs_sql -l $(pwd)/libs_udf
> 其中libs_sql中有:flink-connector-kafka_2.12-1.10.0.jar
> flink-connector-kafka-base_2.12-1.10.0.jar  flink-jdbc_2.12-1.10.0.jar
> 
flink-json-1.10.0.jar。然后A集群所有机器没有加这些包(Flink部署目录lib中没有)。A集群上其他任务提交的包中jar应该不致于会影响到我sql提交的任务。
>
> 
结论是,我libs_sql中没有flink-json、flink-connector-kafka等的时候,提交sql任务会报错。加了的时候,提交sql任务不报错。
> 所以感觉貌似提交sql任务会将启动sql-client时候指定的lib相关包都上传吗?
> 
>
> 在 2020/8/13 下午3:10,“Jeff Zhang” 写入:
>
> 你的10台机器是flink standalone还是 yarn集群 ?
> 如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。
>
> 另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
> https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
    >     或者加入钉钉群讨论,钉钉群号: 32803524
>
>
> Zhao,Yi(SEC)  于2020年8月13日周四 下午1:02写道:
>
> > 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
> > 现在比较混乱,哪些jar需要放到A,哪些放到B。
> >
> >
> > (1) kafka ssl
> >
> 
证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
> >
> > (2)
> >
> 
flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
> >
> >
> >
> >
> 总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
> >
> > 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
> >
> >
> >
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>




关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-08-11 文章 Zhao,Yi(SEC)
刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
[cid:image001.png@01D66FF8.F697E2D0]
这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?


其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 
'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
root
|-- d: LEGACY('RAW', 'ANY')


Table api son schema

2020-08-11 文章 Zhao,Yi(SEC)
[cid:image001.png@01D66FF5.A3F680C0]


如上图,field api被标注过期。替换写法被注释掉,使用注视掉的写法会报错如下。

Exception in thread "main" org.apache.flink.table.api.TableException: A raw 
type backed by type information has no serializable string representation. It 
needs to be resolved into a proper raw type.
   at 
org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:97)
   at org.apache.flink.table.descriptors.Schema.field(Schema.java:88)
   at jobs.IpGapUserFt2.main(IpGapUserFt2.java:83)

不清楚有啥解决方法吗?


其次,我这边鼓捣了半天,发现使用json schema貌似也没办法实现整个表的动态结构。
我业务中json实际如下:
{
“d”:{
  “key”: value
…. … .  .. ..// 此处key动态扩展
}
}
我大概想法是d作为一个field,类型是map(但好像不支持map?必须用row)。用row呢,又必须指定其所有field,就不是动态扩展字段了。


Re: 请教关于部署的问题

2020-08-10 文章 Zhao,Yi(SEC)
配置分启动配置(根据配置觉得如何构建任务,需要本地构建打包,这部分配置必须是本地配置)。
还有运行时候需要读取的配置(比如kafka ssl证书等),这部分可以放到hdfs等分布式存储中(只是举例,像kafka ssl需要修改源码才支持hdfs)。
至于jar的话,如果只是希望分开,对于提交时候是不是希望也避免提交呢?
(1)60MB的大jar包。解决:直接提交。
(2)10个6MB的jar包。解决:不清楚,1.10貌似还不支持的样子。
(3)9个jar不需要提交,仅提交1个用户代码包。解决:提前将9个jar部署到集群的flink的lib下。

—
在 2020/8/11 上午11:16,“abc15...@163.com” 写入:

部署的能不能把依赖和配置文件单独出来指定,而不是打成一个jar,如果可以具体要怎么做?

发自我的iPhone



Re: 关于FlinkSQL的文档中flinksql的connector和catalog划分以及hive catalog和hive connector是否必须配合使用的问题。

2020-08-10 文章 Zhao,Yi(SEC)
好吧。其实我本来觉得 Catalog 和 Connector 
独立开会更好理解,结构也更清晰。比如,按照我的想法,每种Catalog的实现,相当于针对各种主流数据源的表都对应某种元数据存储的方式,比如jdbc中存储了hive表的元数据等。
当然这只是想法,不清楚是否有方法官方维护一个Catalog(比如基于jdbc感觉相对方便,即持久化,更大众;毕竟我记得好像hive也支持jdbc的metastore来着),然后这个Catalog不断支持更多的主流数据源。
甚至,如果不可行。我认为读写hive数据不应该复用hive原先的metastore,我是考虑kafka流表的问题。如果flink-sql创建了一个流表保存到hive
 catalog,那么hive-sql或者spark-sql去读取这个表岂不是会报错什么的?

在 2020/8/11 上午10:26,“Rui Li” 写入:

> 不可以使用jdbc catalog,但使用hive connector嘛?


关于这一点稍微补充一下,我们目前访问hive元数据要求必须启动一个HMS,然后我们通过这个HMS来读写元数据(HiveCatalog就是用来对接HMS的),而不是直接去读底层的DBMS的,所以jdbc
catalog是读不了hive元数据的。

On Tue, Aug 11, 2020 at 9:32 AM Leonard Xu  wrote:

> Hi Zhao
>
> > 1 为什么flinksql 1.11中,JDBC
> Catalog通过简单的链接转给了connector,catalog和connector并不是同一个概念。我认为应该将jdbc
> connectior和jdbc catalog分开放入各自目录。
>
>
> 两个是不同的概念,JDBC catalog 可以 包含在 JDBC connector 里,你可以理解 JDBC connector 是 Flink
> 与 JDBC 交互的连接器,Catalog也属于交互的一部分。JDBC connector里不只是数据的读取/写入 JDBC,也包括了JDBC
> dialect 和 JDBC catalog等, JDBC catalog 目前能读取的表都是 JDBC(目前支持Pg) 的表,JDBC
> catalog 读取表/库 的逻辑 和 JDBC 读取/写入数据 有很多的复用的逻辑。
>
>
> > 2  为什么flinksql1.11中,connector部分没有hive connector。而是在hive
> integration部分,以及catalogs中介绍。而且在 Table API & SQL/Hive Integration/Hive Read
> & Write 部分,第一句是“Using the HiveCatalog and Flink’s connector to Hive, Flink
> can read and write from Hive data as an alternative to Hive’s batch 
engine.”
> Hive 是Hadoop上一个比较大的生态系统,同时hive也有自己的计算引擎(batch),Flink 不是简单地 链接 到hive,而是可以作为
> Hive
> 
的一个替代的计算引擎,除了读取/写入数据到Hive外,用户Hive里的SQL作业完全可以用Flink跑,这已经脱离了简单的交互的范畴了,所以会放在集成部分。
>
> > 。难道不使用hivecatalog就不能读取hive数据?是这个意思嘛。感觉好像意思是hive读取特别,这种情况必须使用hive
> catalog。不可以使用jdbc catalog,但使用hive connector嘛?
>
> 是的,不是特别,而是HiveCatalog 就是用来管理
> Hive中表、库、函数的元数据中心,用这个HiveCatalog也是很自然的事情。目前不可以使用JDBC
> catalog,很长一段时间也应该不行,Jdbc catalog 里存的表都是DB里的表,不支持存放Hive的表。
>
>
> 祝好
> Leonard Xu



-- 
Best regards!
Rui Li




Re: 关于FlinkSQL的文档中flinksql的connector和catalog划分以及hive catalog和hive connector是否必须配合使用的问题。

2020-08-10 文章 Zhao,Yi(SEC)
是的。我更多是纠结文档结构容易造成混淆。我认为catalog和connector是相对独立的概念。最对算是有点关系。
但是根据其他人的回答,目前来看,这2者还真没办法完全独立。比如jdbc connector就是不支持hive表。读写hive表还就是需要hive 
catalog。于是我刚刚回了另一封邮件写到,这种case下,我认为实践中,可以单独搞一个hive 
metastore仅仅服务于flink,hive和spark-sql则使用另一个hive 
metastore。这样去独立出来,避免出现流表被spark,hive可见。
__

在 2020/8/11 上午10:35,“Rui Li” 写入:


你是想问Flink通过HiveCatalog创建的流式表在SparkSQL中是不是可见么?Flink通过HiveCatalog创建的流式表在HMS中也是作为一张普通的表存在的,所以我理解SparkSQL如果对接同一个HMS的话也是可以看到这张表的。但不管是Hive还是SparkSQL,尝试查询这个流式表应该都会出错,目前这一点是需要用户自己保证的,比如可以通过不同的DB来做划分。

On Mon, Aug 10, 2020 at 8:43 PM Zhao,Yi(SEC)  wrote:

> 如果使用了Hive
> catalog,我创建一个流式表,然后返回基于同一个HiveCatalog的spark-sql中,那个表能看到吗?如果尝试查询是不是会出错?
> 无法实验:我现在还没搞定,因为简单的配置ok,连接到了hive metastore,也通过 show
> 
tables看到了表,但select会出错(这个问题后续再说,现在就是想知道这种基于已有catalog的情况时是不是不太好,比较flink-sql特有流表)。
>
> 在 2020/8/10 下午8:24,“Danny Chan” 写入:
>
> 你好 ~
>
> 1. 你是只文档结构吗 ?catalog 是 flink SQL 管理表元数据信息的组件,通过注册 catalog 用户可以直接访问
> catalog 中的已存表,当然用户也可以通过 CREATE TABLE DDL 来创建对应的 connector 表
> 2. 访问 hive metastore 中的表示一定要用 hive catalog 的,如果是新建临时表(不持久化),也可以使用内置的
> catalog
>
> Best,
> Danny Chan
> 在 2020年8月10日 +0800 PM8:14,Zhao,Yi(SEC) ,写道:
> > 1 为什么flinksql 1.11中,JDBC
> Catalog通过简单的链接转给了connector,catalog和connector并不是同一个概念。我认为应该将jdbc
> connectior和jdbc catalog分开放入各自目录。
> >
> > 2 为什么flinksql1.11中,connector部分没有hive connector。而是在hive
> integration部分,以及catalogs中介绍。而且在 Table API & SQL/Hive Integration/Hive Read
> & Write 部分,第一句是“Using the HiveCatalog and Flink’s connector to Hive, Flink
> can read and write from Hive data as an alternative to Hive’s batch
> engine.”。难道不使用hivecatalog就不能读取hive数据?是这个意思嘛。感觉好像意思是hive读取特别,这种情况必须使用hive
> catalog。不可以使用jdbc catalog,但使用hive connector嘛?
>
>
>

-- 
Best regards!
Rui Li




一系列关于基于状态重启任务的问题

2020-08-10 文章 Zhao,Yi(SEC)
请教几个关于基于状态重启的问题。
问题1:基于检查点/保存点启动时候能否指定部分结点不使用状态。
为什么有这么个需求呢,下面说下背景。
任务A:5分钟粒度的统计PV,使用event time,每10s一次触发更新到数据库。
任务B:天级别任务,利用了状态。
如上任务A和B,我整合为一个大任务提交到flink执行。假设有某种场景下,某些数据错误等,我需要做修复等。并且修复方案需要能做到:从指定时间开始运行(这个是我基于kafkaSouce设置开始时间实现),同时配合一个时间范围过滤算子实现。但是flink如果基于状态重启,则kafkaSouce的offset会基于状态中的offset来做,而不是我配置的开始时间来做。但我又不能不基于状态重启,因为还有任务B是不可容忍丢失状态的。

这种情况怎么搞呢?当然通过flink提供的状态操作API去修改状态可能是一种方式,但感觉成本挺高。或者从保存下的保存点/检查点的路径来看,有没有可能从名字看出哪个状态文件是哪个结点的呢?我能否简单找到kafkaSouce结点的状态文件删除,并且配合flink提供的—allowNonRestoredState实现KafkaSouce不基于状态重启,而其他结点基于状态重启呢?当然也不清楚即使这可行,那么这种情况下KafkaSource是否会按照我设置的开始时间去消费。

问题2:任务合并或拆分问题。
拆分:
仍然假设有任务A和任务B,放在同一个JOB中。如果业务需要拆分开,这个相对容易实现。我只需要做个保存点。然后启动基于保存点任务A(配合—allowNonRestoredState,任务B的状态会被忽略)。再然后启动任务B(配合—allowNonRestoredState,任务A的状态会被忽略)。
合并:
问题来了,合并case怎么做。任务A和任务B如想合并怎么做呢?还是之前那个想法,状态文件结构是否可以直接合并到一起呢?比如任务的保存点文件夹和任务B的保存点文件夹合并后是否可以直接被用?
当然我并不清楚检查点和保存点的保存文件夹中文件命名的含义是否是结点uid啥的。是否可行呢?


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-08-11 文章 Zhao,Yi(SEC)
刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:

这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?

stEnv.connect(
new Kafka()
.properties(TestKafkaUtils.getKafkaProperties())
.version("universal")
.topic("test")
.startFromLatest()
).withFormat(new Json()
.failOnMissingField(false)
).withSchema(
new Schema()
.field("d", TypeInformation.of(Map.class))
).inAppendMode().createTemporaryTable("t");

其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 
'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
root
|-- d: LEGACY('RAW', 'ANY')


在 2020/8/11 下午4:23,“zhao liang” 写入:

    Hi,你图挂了,换个图床试试呢

发件人: Zhao,Yi(SEC) 
日期: 星期二, 2020年8月11日 16:04
收件人: user-zh@flink.apache.org 
主题: 关于FLinkSQL如何创建类json无限扩展的表结构问题
刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
[cid:image001.png@01D66FF8.F697E2D0]
这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?


其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 
'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
root
|-- d: LEGACY('RAW', 'ANY')




Re: 如何设置FlinkSQL并行度

2020-08-16 文章 Zhao,Yi(SEC)
我这边才研究FlinkSQL没几天。不过按照目前了解,是不支持算子级别并行度设置的。
此外你说的checkpoint无法正常触发,我估计是因为barrier的问题,部分并行示例没有分区数据,导致没数据就可能导致。这个问题类似,可能无解。

非要解决可以写代码,把souce部分不使用sql实现。
__

在 2020/8/15 下午8:21,“forideal” 写入:

Hi 赵一旦,


目前 Flink SQL 我这边使用也是无法指定各个算子的并行度。目前我这边遇到两个问题。
1.并行度超过 topic partition 的时候会造成资源浪费
2.并行度超过 topic partition 后,checkpoint 也无法正常触发了


Best forideal

















在 2020-08-14 12:03:32,"赵一旦"  写道:
>检查点呢,大多数用FlinkSQL的同学们,你们的任务是随时可运行那种吗,不是必须保证不可间断的准确性级别吗?
>
>Xingbo Huang  于2020年8月14日周五 下午12:01写道:
>
>> Hi,
>>
>> 关于并行度的问题,据我所知,目前Table API上还没法对每一个算子单独设置并行度
>>
    >> Best,
>> Xingbo
>>
>> Zhao,Yi(SEC)  于2020年8月14日周五 上午10:49写道:
>>
>> > 
并行度问题有人帮忙解答下吗,此外补充个相关问题,除了并行度,flink-sql情况下,能做检查点/保存点,并基于检查点/保存点重启sql任务吗。
>> >
>> > 发件人: "Zhao,Yi(SEC)" 
>> > 日期: 2020年8月13日 星期四 上午11:44
>> > 收件人: "user-zh@flink.apache.org" 
>> > 主题: 如何设置FlinkSQL并行度
>> >
>> > 看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。
>> > 如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢?
>> >
>> > 比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。
>> >
>> >
>>




Re: 如何在KeyedProcessFunction中获取processingTime

2020-08-16 文章 Zhao,Yi(SEC)
根据Context获取timerService,然后获取处理时间即可。


在 2020/8/16 下午7:57,“ゞ野蠻遊戲χ” 写入:

大家好

   
当我在KeyedProcessFunction的processElement方法中获取processingTime,就像这样ctx.timestamp(),返回为null,我改如何在processElement中获取processingTime?


谢谢!
嘉治