Hi all:
最近在使用flink sql 做一些计算任务,发现如果一条sql被解析成execute
plan后可能会有多个job,但是这些job的并行度是一样的,目前来看,好像还不能对这些job进行并行度的调整,请问一下大家,有什么办法可能调整sql解析后的job的并行度呢?
hi ,我们之前写mysql遇到过这种问题,因为任务的open函数是任务启动的时候调用的,是只调用一次的。而open函数里面会对jdbc
connection进行初始化,当jdbc
conection因为各种原因断开的时候,例如空闲时间超过max_idel_timeout。这都会导致flush失败,进而导致整个task重启。所以我们后面参照官方的这个JDBCUpsertOutputFormat
自己写了一个ouputFormat,加上了连接的检查和重连,如果对一致性要求不高的话,还可以对flush进行异常捕捉。
原始邮件
发件人:shangwen583767...@qq.com
Hi, 请问使用的是 blink planner 么?可以把 sinkTable 的定义也发一下吗? Best, Jark On Fri, 20 Mar
2020 at 11:40, lucas.wu lucas...@xiaoying.com wrote: Hi all: 建表语句 create
table `source_table`( `SeqNo` varchar, `Type` varchar, `Table` varchar,
`ServerId` varchar, `Database` varchar, `OldData` varchar, `GTID
可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。
原始邮件
发件人:zhishengzhisheng2...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年3月20日(周五) 11:44
主题:Re: flink sql 去重算法
hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state
很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS
Hi all:
建表语句
create table `source_table`(
`SeqNo` varchar,
`Type` varchar,
`Table` varchar,
`ServerId` varchar,
`Database` varchar,
`OldData` varchar,
`GTID` varchar,
`Offset` varchar,
`event_ts` as
to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),
WATERMARK FOR event_ts AS
lucas, 赞专业的分析,看起来是Flink的bug,你可以建个Jira来跟踪。 CC: @Yuzhao Chen
yuzhao@gmail.com Best, Jingsong Lee On Wed, Mar 18, 2020 at 4:15 PM
lucas.wu lucas...@xiaoying.com wrote:初步找到了原因
原来我的建表语句用了computed_column_expression 这种语义。 然后flink内部在使用的时候其实是把它转成了select 语句
初步找到了原因
原来我的建表语句用了computed_column_expression 这种语义。
然后flink内部在使用的时候其实是把它转成了select 语句
...
if (columnExprs.nonEmpty) {
val fieldExprs = fieldNames
.map { name =
if (columnExprs.contains(name)) {
columnExprs(name)
} else {
name
}
}.toArray
val rexNodes =
create table `source_table`(
`SeqNo` varchar,
`Type` varchar,
`Table` varchar,
`ServerId` varchar,
`Database` varchar,
`OldData` varchar,
`GTID` varchar,
`Offset` varchar,
`event_ts` as
to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),
WATERMARK FOR event_ts AS
有相应的接口 可以参考hbase的实现
原始邮件
发件人:王志华a15733178...@163.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年3月13日(周五) 19:17
主题:ddl
目前FLINK中对于DDL这块,它都只能什么类型的技术作为源头表或者SINK 表呢,我也网上也仅仅看到了ddl mysql sink、ddl hbase
sink等。还有其他类型的支持吗?如果不支持的话,是否flink开放了相关的接口,可以提供对其他类型技术的ddl语法支持呢?比如想做一个 ddl kudu
sink之类的 | | 王志华
没人回复大概是之前没人遇到过这种问题,所以下午看了flink的代码,终于有了点头绪。
原因分析:
这个异常的原因就是在task出现异常之后,它需要调用updateTaskExecutionState(TaskExecutionState
taskExecutionState)这个rpc接口去通知flink jobmanager
去改变对应task的状态并且重启task。但是呢,taskExecutionState这个参数里面有个error属性,当我的的task打出来的错误栈太多的时候,在序列化的之后超过了
rpc接口要求的最大数据大小(也就是maximum akka
大家好:
我的flink程序的主要功能是从kafka消费数据,简单处理后,通过jdbc
outputformat发往mysql,但是长时间运行后,报下面的错。请问是什么原因造成的,我增加参数就可以解决吗?
2020-03-08 06:10:30,480 WARN
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler - Could not create
remote rpc invocation message. Failing rpc invocation because...
java.io.IOException:
Hi 大家好
最近有使用flink自带的jdbc outputformat
将flink处理后的数据写到mysql,但是如果我的数据格式有问题,比如超过mysql对应字段设置的大小,或者数据库出现问题,导致延时。这些问题都会导致这个task抛出异常,导致task
fail,进而导致整个job从checkpoint重启。
我的问题是,如果我使用的是flink提供的outputformat,我是否可以catch 异常,并且忽略。如果没有,有没有其它好的办法?
可能是失败的checkpoint目录,可以看看程序中间是不是有失败的checkpoint
原始邮件
发件人:lakeshenshenleifight...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年1月16日(周四) 16:47
主题:Flink 1.6 版本,RocksDBStateBackend ,增量 Checkpoint 目录问题
我使用 Flink 1.6 版本,使用的增量 Checkpoint,我看官网说,默认的 Checkpoint 保留目录是1,也就是会保留一个最新完成的
Checkpoint
,你需要把hbaes client等相关的jar包提供出来放到 lib包里面。 Best, Terry Wang 2020年1月2日
16:54,lucas.wu lucas...@xiaoying.com 写道: Hi 大家好
有个问题要问问大家,我现在用flink1.9版本创建hbase表 sql: create table hbase_dimention_table( id
varchar, info ROW(xxx) )with( 'connector.type' = 'hbase',
'connector.version' = '1.4.3
Hi 大家好
有个问题要问问大家,我现在用flink1.9版本创建hbase表
sql:
create table hbase_dimention_table(
id varchar,
info ROW(xxx)
)with(
'connector.type' = 'hbase',
'connector.version' = '1.4.3',
'connector.table-name' = '',
'connector.zookeeper.quorum' = ‘xxx'
);
接着把flink-hbase_2.11-1.9.0.jar
,lucas.wu:
我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发;
不过这样state会占用很大的内存,需要主意state的清理 lucas.wu lucas...@xiaoying.com
于2019年12月25日周三 下午5:13写道:hi all:
flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?
hi all:
flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?
hi 大家好:
最近有在调研使用flink做实时数仓,但是有个问题没弄清楚,就是明细表和维度表做join的时候,该采取什么的方案?目前的想到的就是明细表通过流消费进来,维度表放缓存。但是这种方案有弊端,就是维度表更新后,历史join过的数据无法再更新。不知道大家还有什么其他的方案?ps:目前有看到flink有支持join,这种需要两个表都是流的方式进入flink,然后会将历史的数据保存在state里面,这种对于量大的表会不会有问题?
可以使用类似的方式
// val sstream = result4.toRetractStream[Row],filter(_.1==trye).map(_._2)
// val result5 = tEnv.fromDataStream(sstream)
// result5.toAppendStream[Row].print()
原始邮件
发件人:Jark wuimj...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2019年12月8日(周日) 11:53
主题:Re: Flink
你这个用了distinct的话,肯定就是针对全局的id进行distinct了,设置state也无效。
建议自己使用bigmap或者hyperlog算法实现一个distinct,这样可以节省内存
原始邮件
发件人:谷歌-akulakuzhan...@akulaku.com
收件人:user-zh@flink.apache.orguser...@flink.apache.org
发送时间:2019年11月26日(周二) 20:05
主题:flink检查点状态大小
20 matches
Mail list logo