SQL 管理表元数据信息的组件,通过注册 catalog 用户可以直接访问 catalog
中的已存表,当然用户也可以通过 CREATE TABLE DDL 来创建对应的 connector 表
2. 访问 hive metastore 中的表示一定要用 hive catalog 的,如果是新建临时表(不持久化),也可以使用内置的 catalog
Best,
Danny Chan
在 2020年8月10日 +0800 PM8:14,Zhao,Yi(SEC) ,写道:
> 1 为什么flinksql 1.11中,JDBC
Catalo
咨询下,FlinkSQl的event time必须在DDL中定义吗。能否DDL只是定义普通数据字段,比如有个time属性。
然后在select 的时候指定具体使用的watermark策略。
目的:假设基于同一个表A,我查询1需要使用watermark为time-1min,查询2需要使用watermark为time-2min。
其次除了这种case,如果我基于表1查询得到结果输出到表2,那么表2的event
time定义呢?比如在表2的定义中基于表2的某个属性(比如叫time2),然后插入表2的时候只要time2属性存在就可以?
此外,如果对比datastream
最近刚刚开始研究FlinkSQL,之前一直用的DataStream API,现在想尝试下FlinkSQL的玩法。
目前简单看下来,感觉相对来说相比DataStream API不够灵活,所以有一些实践疑问咨询下。
(1) 通常Kafka的动态表会通过临时表的方式写在程序开头声明定义呢?还是直接写在sql-client的配置文件中呢。
如果写在配置文件,好处是类似于一个固化的表定义一样,每次执行sql-client的cli之后可以马上查询相关数据。但是有个问题是这种情况下kafka设置读取earliest还是latest?貌似根据具体查询需求临时改配置好像也不是很方便。
如题,是不是不支持offset呀。在中国,天窗口实际应该是 INTVERVAL ‘1’ DAY + 8小时offset。
但是看了文档没发现添加offset的语法。
如果仅仅是时间显示问题还好说,搞个函数转一转,但分窗这个不提供入口没办法。
看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。
如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢?
比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。
背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
现在比较混乱,哪些jar需要放到A,哪些放到B。
(1) kafka ssl
证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
(2)
没有小伙伴帮忙看下这个问题吗。可以优先看下第2、3、4个问题。
其他问题,第1/5个问题,我自己大概感觉,直接使用memory
catalog就ok啦。我看了下FlinkSQL好像没有临时表的概念。10个SQL开10个sql-client的session,各个session独享自己的memory
catalog,创建的表也类似于临时表的效果。
发件人: "Zhao,Yi(SEC)"
日期: 2020年8月12日 星期三 下午2:20
收件人: "user-zh@flink.apache.org"
主题: 关于FlinkSQL的一些最佳实践
。
但是你这个case应该是不需要offset就可以的,现在划分窗口的逻辑是直接按照毫秒级别的时间戳来整除进行计算的。
所以如果窗口是1min/1h/1d 这种,都是整点的窗口,没有问题的。如果你有这个问题,那应该是你的时间字段有时区问题。
有一种case是需要offset的,比如一周这种窗口,因为1980-1-1是周四,所以一周的窗口是从周四开始的。
[1] https://issues.apache.org/jira/browse/FLINK-17767
Zhao,Yi(SEC) 于2020年8月12日周三 下午8:15写道
standalone还是 yarn集群 ?
如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。
另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
或者加入钉钉群讨论,钉钉群号: 32803524
Zhao,Yi(SEC) 于2020年8月13日周四 下午1:02写道:
>
提交段还是需要这些包,因为没有这些包提交sql时就直接报错了,于是还是需要通过-j或-l指定,然后进一步游会被上传?所以说,此处又涉及到一个flink集群上的包和sql-client提交的包重复的问题,一致还好,不一致情况下哪个优先呢?
___
在 2020/8/14 上午10:46,“Zhao,Yi(SEC)” 写入:
分析个报错,报错如下:
[ERROR] Could not execute SQL statement. Reason
经历了1.7到1.8,1.8到1.9,1.9到1.10;前2还好,最后一个有些坑,jdk8版本不要太旧,某个版本和1.10配合会有bug。
在 2020/8/14 上午9:25,“蒋佳成(Jiacheng Jiang)”<920334...@qq.com> 写入:
1.10有了新的内存模型,没弄清楚这些内存配置前,可能跑不起job!建议先弄清楚,在测试环境上先搞搞--原始邮件--
发件人:引领
并行度问题有人帮忙解答下吗,此外补充个相关问题,除了并行度,flink-sql情况下,能做检查点/保存点,并基于检查点/保存点重启sql任务吗。
发件人: "Zhao,Yi(SEC)"
日期: 2020年8月13日 星期四 上午11:44
收件人: "user-zh@flink.apache.org"
主题: 如何设置FlinkSQL并行度
看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。
如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢?
比如数据源理论上应该和kaf
间才会发现这个class不存在吧。
___
在 2020/8/14 上午9:44,“godfrey he” 写入:
sql client 中通过 -j 或者 -l 的指定的包会被随着job提交的时候一起上传到jm。
Zhao,Yi(SEC) 于2020年8月13日周四 下午5:11写道:
> A是10机器集群(HA模式,独立集群),
刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
[cid:image001.png@01D66FF8.F697E2D0]
这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?
其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW',
'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
root
|-- d: LEGACY('RAW', 'ANY')
[cid:image001.png@01D66FF5.A3F680C0]
如上图,field api被标注过期。替换写法被注释掉,使用注视掉的写法会报错如下。
Exception in thread "main" org.apache.flink.table.api.TableException: A raw
type backed by type information has no serializable string representation. It
needs to be resolved into a proper raw type.
at
配置分启动配置(根据配置觉得如何构建任务,需要本地构建打包,这部分配置必须是本地配置)。
还有运行时候需要读取的配置(比如kafka ssl证书等),这部分可以放到hdfs等分布式存储中(只是举例,像kafka ssl需要修改源码才支持hdfs)。
至于jar的话,如果只是希望分开,对于提交时候是不是希望也避免提交呢?
(1)60MB的大jar包。解决:直接提交。
(2)10个6MB的jar包。解决:不清楚,1.10貌似还不支持的样子。
(3)9个jar不需要提交,仅提交1个用户代码包。解决:提前将9个jar部署到集群的flink的lib下。
好吧。其实我本来觉得 Catalog 和 Connector
独立开会更好理解,结构也更清晰。比如,按照我的想法,每种Catalog的实现,相当于针对各种主流数据源的表都对应某种元数据存储的方式,比如jdbc中存储了hive表的元数据等。
当然这只是想法,不清楚是否有方法官方维护一个Catalog(比如基于jdbc感觉相对方便,即持久化,更大众;毕竟我记得好像hive也支持jdbc的metastore来着),然后这个Catalog不断支持更多的主流数据源。
/11 上午10:35,“Rui Li” 写入:
你是想问Flink通过HiveCatalog创建的流式表在SparkSQL中是不是可见么?Flink通过HiveCatalog创建的流式表在HMS中也是作为一张普通的表存在的,所以我理解SparkSQL如果对接同一个HMS的话也是可以看到这张表的。但不管是Hive还是SparkSQL,尝试查询这个流式表应该都会出错,目前这一点是需要用户自己保证的,比如可以通过不同的DB来做划分。
On Mon, Aug 10, 2020 at 8:43 PM Zhao,Yi(SEC) wrote:
> 如
请教几个关于基于状态重启的问题。
问题1:基于检查点/保存点启动时候能否指定部分结点不使用状态。
为什么有这么个需求呢,下面说下背景。
任务A:5分钟粒度的统计PV,使用event time,每10s一次触发更新到数据库。
任务B:天级别任务,利用了状态。
class))
).inAppendMode().createTemporaryTable("t");
其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW',
'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
root
|-- d: LEGACY('RAW', 'ANY')
在 2020/8/11 下午4:23,“zhao liang” 写入:
Hi,你图挂了,换个图床试试呢
发件人: Zhao,Yi(SEC)
日期
Xingbo Huang 于2020年8月14日周五 下午12:01写道:
>
>> Hi,
>>
>> 关于并行度的问题,据我所知,目前Table API上还没法对每一个算子单独设置并行度
>>
>> Best,
>> Xingbo
>>
>> Zhao,Yi(SEC) 于2020年8月14日周五 上午10:49写道:
>>
>> >
并行度问题有人帮忙解答下吗,此外
根据Context获取timerService,然后获取处理时间即可。
在 2020/8/16 下午7:57,“ゞ野蠻遊戲χ” 写入:
大家好
当我在KeyedProcessFunction的processElement方法中获取processingTime,就像这样ctx.timestamp(),返回为null,我改如何在processElement中获取processingTime?
谢谢!
嘉治
22 matches
Mail list logo