如果使用了Hive catalog,我创建一个流式表,然后返回基于同一个HiveCatalog的spark-sql中,那个表能看到吗?如果尝试查询是不是会出错?
无法实验:我现在还没搞定,因为简单的配置ok,连接到了hive metastore,也通过 show
tables看到了表,但select会出错(这个问题后续再说,现在就是想知道这种基于已有catalog的情况时是不是不太好,比较flink-sql特有流表)。
在 2020/8/10 下午8:24,“Danny Chan” 写入:
你好 ~
1. 你是只文档结构吗 ?catalog 是 flink
通常使用 preJob 模式的更多,考虑点主要是资源隔离,缺点是作业需要等到 JM/TM 启动后才能运行(作业启动速度会慢一些),session
模式刚好相反,你需要评估你们的场景。
--
Best,
Matt Wang
在2020年08月10日 16:21,Dream-底限 写道:
hi、
FlinkOnYarn集群部署是推荐使用yarn-session模式所有任务共用一个,还是推荐使用preJob模式每个任务起一个小集群
这个应该根据业务特性来决定吧。
如果是一些大型的streaming任务,需要长期稳定运行并且有良好的隔离性,则可以考虑perjob模式。
如果需要经常性提交一些小任务(常见于batch任务)或者说有一批相关联的任务,彼此隔离性要求也不高的,可以考虑session模式。
感觉说到底还是业务隔离性与资源的权衡。
Dream-底限 于2020年8月10日周一 下午4:21写道:
> hi、
> FlinkOnYarn集群部署是推荐使用yarn-session模式所有任务共用一个,还是推荐使用preJob模式每个任务起一个小集群
>
“By default, Flink allows subtasks to share slots even if they are subtasks
of different tasks, so long as they are from the same job.”
参考官网描述[1],应该只有相同job下的task可以共享slot。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html
wangl...@geekplus.com
下面是附件的内容,请问是因为什么导致重启呢?
2阶段提交demo:
@Slf4j public class CommonOracleSink extends
TwoPhaseCommitSinkFunctionhttp://node3:39469 was granted leadership with
leaderSessionID=----2020-08-10 16:37:34,312
INFO
Hi,这个日志全是有点头大。。。
我刚想到,除了task重启外,还有一种情况是task没有调度成功。
你能通过flink web ui观察到task的状态吗,都是RUNNING吗?
如果一直是schedule,那应该是缺少对应的资源进行调度,需要检查下task manager提供的slot资源以及任务所需的资源。
如果是running、failed、schedule的不断切换,那需要检查task manager的日志,应该有warn。
Bruce 于2020年8月10日周一 下午6:12写道:
> 下面是附件的内容,请问是因为什么导致重启呢?
>
>
>
Hi,
我看了一下mini-batch的聚合函数的实现,的确是没有开启状态清理,我建了一个issue[1] 来跟进修复这个bug。
[1] https://issues.apache.org/jira/browse/FLINK-18872
op <520075...@qq.com> 于2020年8月10日周一 下午4:49写道:
> hi
> grouby count(*)不是吗
>
>
>
>
> --原始邮件--
> 发件人:
>
你好 ~
1. 你是只文档结构吗 ?catalog 是 flink SQL 管理表元数据信息的组件,通过注册 catalog 用户可以直接访问 catalog
中的已存表,当然用户也可以通过 CREATE TABLE DDL 来创建对应的 connector 表
2. 访问 hive metastore 中的表示一定要用 hive catalog 的,如果是新建临时表(不持久化),也可以使用内置的 catalog
Best,
Danny Chan
在 2020年8月10日 +0800 PM8:14,Zhao,Yi(SEC) ,写道:
> 1 为什么flinksql
flink官网有一个kerberos相关的说明文档[1],不知是否能帮助到你。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-kerberos.html
zjfpla...@hotmail.com 于2020年8月10日周一 下午3:15写道:
> Flink on yarn模式
>
>
>
> zjfpla...@hotmail.com
>
> 发件人: zjfpla...@hotmail.com
> 发送时间: 2020-08-10 15:09
> 收件人:
I think this sounds good. +1
On Wed, Aug 5, 2020 at 8:37 PM jincheng sun
wrote:
> Hi David, Thank you for sharing the problems with the current document,
> and I agree with you as I also got the same feedback from Chinese users. I
> am often contacted by users to ask questions such as whether
请教各位
我用的是 Beam 2.23.0, with flink runner 1.8.2. 想要实验启动checkpoint 和 Beam KafkaIO
EOS(exactly once semantics) 以后,添加或删除source/sink operator
然后从savepoint恢复作业的情况。我是在电脑上run kafka 和 flink cluster (1 job manager, 1 task
manager)
下面是我尝试的不同场景:
1. 在SAVEPOINT 后,添加一个source topic
在savepoint之前: read from
最近在做一个报表的项目,5分钟和小时的报表采用Flink计算,遇到下面几个问题,请大佬帮忙解答下,谢谢。
输入的原始数据流包含了几十个维度和指标字段,然后会抽取其中的2~3个维度和若干指标进行汇聚计算,
有些还需要计算分组TOPN,还有任务依赖,先计算3个维度,然后从3个维度计算两个维度。
我当前的实现流图是:
中间数据都是使用Row来传递,最后将Row转换成Avro的Record写入HDFS。
现在单个时间粒度要计算近300张报表,任务图太复杂,我将计算任务分到了4个Job(1.7.2版本jobgraph过大提交不上去),每个处理两个时粒度的140张报表,
当前遇到的问题:
最近在做一个报表的项目,5分钟和小时的报表采用Flink计算,遇到下面几个问题,请大佬帮忙解答下,谢谢。
输入的原始数据流包含了几十个维度和指标字段,然后会抽取其中的2~3个维度和若干指标进行汇聚计算,
有些还需要计算分组TOPN,还有任务依赖,先计算3个维度,然后从3个维度计算两个维度。
我当前的实现流图是:
中间数据都是使用Row来传递,最后将Row转换成Avro的Record写入HDFS。
现在单个时间粒度要计算近300张报表,任务图太复杂,我将计算任务分到了4个Job(1.7.2版本jobgraph过大提交不上去),每个处理两个时粒度的140张报表,
当前遇到的问题:
Hi,
我觉得是时候考虑把hive文档移到connector里了,我们没必要割裂它们
Best,
Jingsong
On Tue, Aug 11, 2020 at 10:39 AM Zhao,Yi(SEC) wrote:
> 是的。我更多是纠结文档结构容易造成混淆。我认为catalog和connector是相对独立的概念。最对算是有点关系。
> 但是根据其他人的回答,目前来看,这2者还真没办法完全独立。比如jdbc connector就是不支持hive表。读写hive表还就是需要hive
>
flink.apache.orgExample
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;
public class Example {
public static void main(String[] args)
配置分启动配置(根据配置觉得如何构建任务,需要本地构建打包,这部分配置必须是本地配置)。
还有运行时候需要读取的配置(比如kafka ssl证书等),这部分可以放到hdfs等分布式存储中(只是举例,像kafka ssl需要修改源码才支持hdfs)。
至于jar的话,如果只是希望分开,对于提交时候是不是希望也避免提交呢?
(1)60MB的大jar包。解决:直接提交。
(2)10个6MB的jar包。解决:不清楚,1.10貌似还不支持的样子。
(3)9个jar不需要提交,仅提交1个用户代码包。解决:提前将9个jar部署到集群的flink的lib下。
Hi Zhao
> 1 为什么flinksql 1.11中,JDBC
> Catalog通过简单的链接转给了connector,catalog和connector并不是同一个概念。我认为应该将jdbc
> connectior和jdbc catalog分开放入各自目录。
两个是不同的概念,JDBC catalog 可以 包含在 JDBC connector 里,你可以理解 JDBC connector 是 Flink 与
JDBC 交互的连接器,Catalog也属于交互的一部分。JDBC connector里不只是数据的读取/写入 JDBC,也包括了JDBC dialect 和
Thank you for your positive feedback Seth !
Would you please vote in the voting mail thread. Thank you!
Best,
Jincheng
Seth Wiesman 于2020年8月10日周一 下午10:34写道:
> I think this sounds good. +1
>
> On Wed, Aug 5, 2020 at 8:37 PM jincheng sun
> wrote:
>
>> Hi David, Thank you for sharing the
好吧。其实我本来觉得 Catalog 和 Connector
独立开会更好理解,结构也更清晰。比如,按照我的想法,每种Catalog的实现,相当于针对各种主流数据源的表都对应某种元数据存储的方式,比如jdbc中存储了hive表的元数据等。
当然这只是想法,不清楚是否有方法官方维护一个Catalog(比如基于jdbc感觉相对方便,即持久化,更大众;毕竟我记得好像hive也支持jdbc的metastore来着),然后这个Catalog不断支持更多的主流数据源。
> 可以单独搞一个hive metastore仅仅服务于flink,hive和spark-sql则使用另一个hive metastore
是的,完全可以这样用。只是我们代码里没有做这种限制,因为不一定适用于所以用户。另外我印象中SparkSQL也会在HMS存它特定的表的,Hive去读这种表的话可能不会报错,但应该读不到数据。
文档结构确实可以考虑优化一下,跟其他connector保持一致。
On Tue, Aug 11, 2020 at 10:39 AM Zhao,Yi(SEC) wrote:
>
是的。我更多是纠结文档结构容易造成混淆。我认为catalog和connector是相对独立的概念。最对算是有点关系。
但是根据其他人的回答,目前来看,这2者还真没办法完全独立。比如jdbc connector就是不支持hive表。读写hive表还就是需要hive
catalog。于是我刚刚回了另一封邮件写到,这种case下,我认为实践中,可以单独搞一个hive
metastore仅仅服务于flink,hive和spark-sql则使用另一个hive
metastore。这样去独立出来,避免出现流表被spark,hive可见。
__
在
flink??1.10??
6 Wilma: age 35
5 Fred: age 35
??flink??
----
??:
请教几个关于基于状态重启的问题。
问题1:基于检查点/保存点启动时候能否指定部分结点不使用状态。
为什么有这么个需求呢,下面说下背景。
任务A:5分钟粒度的统计PV,使用event time,每10s一次触发更新到数据库。
任务B:天级别任务,利用了状态。
Hi,
关键的图挂了,邮件里上传图片经常挂,可以用图床工具发个链接。
Best
Leonard
> 在 2020年8月10日,23:35,lfgy <15029270...@163.com> 写道:
>
> 最近在做一个报表的项目,5分钟和小时的报表采用Flink计算,遇到下面几个问题,请大佬帮忙解答下,谢谢。
> 输入的原始数据流包含了几十个维度和指标字段,然后会抽取其中的2~3个维度和若干指标进行汇聚计算,
> 有些还需要计算分组TOPN,还有任务依赖,先计算3个维度,然后从3个维度计算两个维度。
> 我当前的实现流图是:
>
你是想问Flink通过HiveCatalog创建的流式表在SparkSQL中是不是可见么?Flink通过HiveCatalog创建的流式表在HMS中也是作为一张普通的表存在的,所以我理解SparkSQL如果对接同一个HMS的话也是可以看到这张表的。但不管是Hive还是SparkSQL,尝试查询这个流式表应该都会出错,目前这一点是需要用户自己保证的,比如可以通过不同的DB来做划分。
On Mon, Aug 10, 2020 at 8:43 PM Zhao,Yi(SEC) wrote:
> 如果使用了Hive
>
针对第5条,似乎只能让source并行度和kafka的topic的分区一致才行,另外针对最后你说的每个任务的字段和类型都不一样,那把这些信息都当做维表信息使用,你已经拆分任务了,那每个任务跑指定的一些维表数据,你的图看着有种在重复计算的样子,不知道你的具体任务信息,可否按5分钟和小时两个大算子进行统计汇聚,topn当sideooutpu输出
发件人: lfgy <15029270...@163.com>
日期: 星期二, 2020年8月11日 00:11
收件人: user-zh@flink.apache.org
主题: 请教flink计算一些报表需求的实现(附带任务图)
请教下,flink on yarn : 无法分发jar包
[root@yueworldnode05 flink]# /opt/flink-1.11.1/bin/flink run -m yarn-cluster -c
yueworld.upgrade.SavepointForRestore flink_1.11.1-1.0.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
Hi
恩, 重新试了下, 这种是可以的, 前面是我操作错了, 谢谢~
Thx
在 2020-08-10 13:36:36,"Yang Wang" 写道:
>你是自己打了一个新的镜像,把flink-shaded-hadoop-2-uber-2.8.3-10.0.jar放到lib下面了吗
>如果是的话不应该有这样的问题
>
>Best,
>Yang
>
>RS 于2020年8月10日周一 下午12:04写道:
>
>> Hi,
>> 我下载了flink-shaded-hadoop-2-uber-2.8.3-10.0.jar, 然后放到了lib下, 重启了集群,
>>
> 不可以使用jdbc catalog,但使用hive connector嘛?
关于这一点稍微补充一下,我们目前访问hive元数据要求必须启动一个HMS,然后我们通过这个HMS来读写元数据(HiveCatalog就是用来对接HMS的),而不是直接去读底层的DBMS的,所以jdbc
catalog是读不了hive元数据的。
On Tue, Aug 11, 2020 at 9:32 AM Leonard Xu wrote:
> Hi Zhao
>
> > 1 为什么flinksql 1.11中,JDBC
>
部署的能不能把依赖和配置文件单独出来指定,而不是打成一个jar,如果可以具体要怎么做?
发自我的iPhone
Hi Yang
数据倾斜严重的task处理数据比较慢,barrier很可能卡在了channel里面,导致task一直无法收到barrier触发该task上的checkpoint。这也与你观察到的现象一致(sync+async的时间很短)。
数据倾斜情况下,无论哪种state
backend都会比较吃力。FsStateBackend在不发生GC的情况下,性能是要优于RocksDB的,如果只是简单的切换state
backend,也应该考虑将RocksDB使用的managed memory转移到JVM heap上,建议观察一下GC日志,是否存在频繁的GC和Full GC。
hi
1 checkpoint/savepoint 可以理解为将 状态备份到远程存储,恢复的时候会通过 operator 的 uid 来恢复
state,如果你确定不希望某些 operator 的 state 不进行恢复的话,或者使用不同的 uid
可以达到你的需求,具体的可以看一下这个文档的内容[1]
2 合并的时候如果想把 savepoint/checkpoint 用起来,还是需要修改 checkpoint/savepoint
的内容,或者你可以试试 state processor api[2]
[1]
hi
grouby count(*)??
----
??:
"user-zh"
您好,这里有个问题反馈下!
读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,
没有抛任何异常但是checkpoint失败:
job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED
instead. Aborting checkpoint.
附件
1.flink.log是yarn jobmanager打印的伪日志
2.Job.txt是job的伪代码
3.jdbc两阶段提交的伪代码附件
发自我的iPhone
感谢唐云老师,任务的确是存在一定的数据倾斜,执行cp时间比较久是因为其中一个subtask执行cp的时间太久了主要是end-to-end时间长
而且这个subtask就是数据倾斜比较严重的task,我测试的时候重启任务都是从最新的offset重启的是随着每次执行cp时间越来越长,监控上显示kafkasource端日志堆积越来越大,但是相同的代码只是修改了statebackend为rocksdb
就不存在这种问题,所以很奇怪为什么用的内存反而不如rocksdb了
Yun Tang 于2020年8月10日周一 下午4:21写道:
> Hi Yang
>
>
Flink 一个 job 不同的 operator 可以共享 slot
但能做到不同的 job 共享 slot 吗?
wangl...@geekplus.com
flink 1.11.0的话,依赖的beam版本是2.19.0,所以先不要用最新的beam 2.23.0
针对你这个问题,应该是在启动过程中,由于某种原因,Python进程启动失败了,常见的原因有:依赖缺失,Python版本不对等等。
按说在log文件里(你发的log信息所在的log文件),应该能看到详细的原因,你的log文件里面没有详细的失败原因吗?
> 在 2020年8月7日,下午1:44,lgs <9925...@qq.com> 写道:
>
> Hi Jincheng,
>
> 我现在碰到同样的问题,udf运行的时候会打印这样的log:
> 2020-08-07
不可以的
Thank you~
Xintong Song
On Mon, Aug 10, 2020 at 3:39 PM wangl...@geekplus.com
wrote:
>
> Flink 一个 job 不同的 operator 可以共享 slot
> 但能做到不同的 job 共享 slot 吗?
>
>
>
>
>
> wangl...@geekplus.com
>
>
Hi,咨询各位一个问题,我们线上任务使用rocksdb作为statebackend
时间久了发现会出现反压,查看服务器监控发现机器io经常是满的,为了保证业务稳定性,现在将statebackend改为filesystem,但是发现已经配置了很大的内存,使用filesystem之后执行cp时间特别长,而且kafka数据源积压很大,大家有遇到这种情况的吗?是使用filesystem的姿势不对吗?
hi、
FlinkOnYarn集群部署是推荐使用yarn-session模式所有任务共用一个,还是推荐使用preJob模式每个任务起一个小集群
Hello,
关于这个问题,可以查看官方文档中关于空闲输入或空闲源的处理策略的解释:
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
另外附上一篇相关文章:
https://mp.weixin.qq.com/s/108o9iwEZaHyMoRBGUKX8g
希望能帮助到你
发件人: Zhao,Yi(SEC)
发送时间: 2020-08-10 10:08
收件人:
Hi Yang
checkpoint执行时间长,具体是同步阶段还是异步阶段长呢,亦或是同步+异步时间不长但是end-to-end 时间长呢?
如果是异步阶段时间长,一般是因为使用的DFS性能较差。
如果各个阶段时间均不长,但是总体时间很长,很有可能还是因为反压(如果启用了exactly once
checkpoint,可以观察是否buffered的数据很多)
kafka数据源积压的数据多,不就是说明source端存在延迟么,这种说明整体作业还是处于反压的状态,需要定位一下究竟是哪里在反压,不一定与使用FsStateBackend有直接关系。
祝好
唐云
Hi,
我看你开了minibatch,你用了aggregate算子了吗?
> -原始邮件-
> 发件人: op <520075...@qq.com>
> 发送时间: 2020-08-10 10:50:08 (星期一)
> 收件人: user-zh
> 抄送:
> 主题: 回复: flink sql状态清理问题
>
> 配置了minIdleStateRetentionTime ,
> val tConfig = tableEnv.getConfig
> tConfig.setIdleStateRetentionTime(Time.minutes(5),
Flink on yarn模式
zjfpla...@hotmail.com
发件人: zjfpla...@hotmail.com
发送时间: 2020-08-10 15:09
收件人: user-zh
主题: flink krb5.conf配置
Hi,
flink针对kerberos开启的cdh集群,是如何配置krb5.conf的?
zjfpla...@hotmail.com
我觉得原因应该是 postgres 中在建表的时候,默认会把字段名转成小写的,所以你在 Flink SQL 这边也要声明成小写的。
你可以在postgres 中看一下表的字段信息。
Best,
Jark
On Fri, 7 Aug 2020 at 13:48, lgs <9925...@qq.com> wrote:
> schema是public
> 问题在这里:ERROR: column "recordid" of relation "alarm_history_data" does not
> exist
>
>
Hi,
flink针对kerberos开启的cdh集群,是如何配置krb5.conf的?
zjfpla...@hotmail.com
46 matches
Mail list logo