flink sql job 并行度

2020-04-28 Thread lucas.wu
Hi all: 最近在使用flink sql 做一些计算任务,发现如果一条sql被解析成execute plan后可能会有多个job,但是这些job的并行度是一样的,目前来看,好像还不能对这些job进行并行度的调整,请问一下大家,有什么办法可能调整sql解析后的job的并行度呢?

回复:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效

2020-03-23 Thread lucas.wu
hi ,我们之前写mysql遇到过这种问题,因为任务的open函数是任务启动的时候调用的,是只调用一次的。而open函数里面会对jdbc connection进行初始化,当jdbc conection因为各种原因断开的时候,例如空闲时间超过max_idel_timeout。这都会导致flush失败,进而导致整个task重启。所以我们后面参照官方的这个JDBCUpsertOutputFormat 自己写了一个ouputFormat,加上了连接的检查和重连,如果对一致性要求不高的话,还可以对flush进行异常捕捉。 原始邮件 发件人:shangwen583767...@qq.com

Re: rowtime 的类型序列化问题

2020-03-20 Thread lucas.wu
Hi, 请问使用的是 blink planner 么?可以把 sinkTable 的定义也发一下吗? Best, Jark On Fri, 20 Mar 2020 at 11:40, lucas.wu lucas...@xiaoying.com wrote: Hi all: 建表语句 create table `source_table`( `SeqNo` varchar, `Type` varchar, `Table` varchar, `ServerId` varchar, `Database` varchar, `OldData` varchar, `GTID

Re: flink sql 去重算法

2020-03-19 Thread lucas.wu
可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。 原始邮件 发件人:zhishengzhisheng2...@gmail.com 收件人:user-zhuser...@flink.apache.org 发送时间:2020年3月20日(周五) 11:44 主题:Re: flink sql 去重算法 hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state 很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS

rowtime 的类型序列化问题

2020-03-19 Thread lucas.wu
Hi all: 建表语句 create table `source_table`( `SeqNo` varchar, `Type` varchar, `Table` varchar, `ServerId` varchar, `Database` varchar, `OldData` varchar, `GTID` varchar, `Offset` varchar, `event_ts` as to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'), WATERMARK FOR event_ts AS

转发:Re: sql关键字问题

2020-03-18 Thread lucas.wu
lucas, 赞专业的分析,看起来是Flink的bug,你可以建个Jira来跟踪。 CC: @Yuzhao Chen yuzhao@gmail.com Best, Jingsong Lee On Wed, Mar 18, 2020 at 4:15 PM lucas.wu lucas...@xiaoying.com wrote:初步找到了原因 原来我的建表语句用了computed_column_expression 这种语义。 然后flink内部在使用的时候其实是把它转成了select 语句

回复:sql关键字问题

2020-03-18 Thread lucas.wu
初步找到了原因 原来我的建表语句用了computed_column_expression 这种语义。 然后flink内部在使用的时候其实是把它转成了select 语句 ... if (columnExprs.nonEmpty) { val fieldExprs = fieldNames .map { name = if (columnExprs.contains(name)) { columnExprs(name) } else { name } }.toArray val rexNodes =

sql关键字问题

2020-03-18 Thread lucas.wu
create table `source_table`( `SeqNo` varchar, `Type` varchar, `Table` varchar, `ServerId` varchar, `Database` varchar, `OldData` varchar, `GTID` varchar, `Offset` varchar, `event_ts` as to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'), WATERMARK FOR event_ts AS

回复:ddl

2020-03-13 Thread lucas.wu
有相应的接口 可以参考hbase的实现 原始邮件 发件人:王志华a15733178...@163.com 收件人:user-zhuser...@flink.apache.org 发送时间:2020年3月13日(周五) 19:17 主题:ddl 目前FLINK中对于DDL这块,它都只能什么类型的技术作为源头表或者SINK 表呢,我也网上也仅仅看到了ddl mysql sink、ddl hbase sink等。还有其他类型的支持吗?如果不支持的话,是否flink开放了相关的接口,可以提供对其他类型技术的ddl语法支持呢?比如想做一个 ddl kudu sink之类的 | | 王志华

回复:flink 长时间运行后出现报错

2020-03-09 Thread lucas.wu
没人回复大概是之前没人遇到过这种问题,所以下午看了flink的代码,终于有了点头绪。 原因分析: 这个异常的原因就是在task出现异常之后,它需要调用updateTaskExecutionState(TaskExecutionState taskExecutionState)这个rpc接口去通知flink jobmanager 去改变对应task的状态并且重启task。但是呢,taskExecutionState这个参数里面有个error属性,当我的的task打出来的错误栈太多的时候,在序列化的之后超过了 rpc接口要求的最大数据大小(也就是maximum akka

flink 长时间运行后出现报错

2020-03-08 Thread lucas.wu
大家好: 我的flink程序的主要功能是从kafka消费数据,简单处理后,通过jdbc outputformat发往mysql,但是长时间运行后,报下面的错。请问是什么原因造成的,我增加参数就可以解决吗? 2020-03-08 06:10:30,480 WARN org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler - Could not create remote rpc invocation message. Failing rpc invocation because... java.io.IOException:

关于task异常的问题

2020-03-02 Thread lucas.wu
Hi 大家好 最近有使用flink自带的jdbc outputformat 将flink处理后的数据写到mysql,但是如果我的数据格式有问题,比如超过mysql对应字段设置的大小,或者数据库出现问题,导致延时。这些问题都会导致这个task抛出异常,导致task fail,进而导致整个job从checkpoint重启。 我的问题是,如果我使用的是flink提供的outputformat,我是否可以catch 异常,并且忽略。如果没有,有没有其它好的办法?

回复:Flink 1.6 版本,RocksDBStateBackend ,增量 Checkpoint 目录问题

2020-01-16 Thread lucas.wu
可能是失败的checkpoint目录,可以看看程序中间是不是有失败的checkpoint 原始邮件 发件人:lakeshenshenleifight...@gmail.com 收件人:user-zhuser...@flink.apache.org 发送时间:2020年1月16日(周四) 16:47 主题:Flink 1.6 版本,RocksDBStateBackend ,增量 Checkpoint 目录问题 我使用 Flink 1.6 版本,使用的增量 Checkpoint,我看官网说,默认的 Checkpoint 保留目录是1,也就是会保留一个最新完成的 Checkpoint

Re: flink 创建hbase出错

2020-01-02 Thread lucas.wu
,你需要把hbaes client等相关的jar包提供出来放到 lib包里面。 Best, Terry Wang 2020年1月2日 16:54,lucas.wu lucas...@xiaoying.com 写道: Hi 大家好 有个问题要问问大家,我现在用flink1.9版本创建hbase表 sql: create table hbase_dimention_table( id varchar, info ROW(xxx) )with( 'connector.type' = 'hbase', 'connector.version' = '1.4.3

flink 创建hbase出错

2020-01-02 Thread lucas.wu
Hi 大家好 有个问题要问问大家,我现在用flink1.9版本创建hbase表 sql: create table hbase_dimention_table( id varchar, info ROW(xxx) )with( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = '', 'connector.zookeeper.quorum' = ‘xxx' ); 接着把flink-hbase_2.11-1.9.0.jar

Re: flink 维表关联

2019-12-25 Thread lucas.wu
,lucas.wu: 我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发; 不过这样state会占用很大的内存,需要主意state的清理 lucas.wu lucas...@xiaoying.com 于2019年12月25日周三 下午5:13写道:hi all: flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?

flink 维表关联

2019-12-25 Thread lucas.wu
hi all: flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?

使用flink 做维表关联

2019-12-20 Thread lucas.wu
hi 大家好: 最近有在调研使用flink做实时数仓,但是有个问题没弄清楚,就是明细表和维度表做join的时候,该采取什么的方案?目前的想到的就是明细表通过流消费进来,维度表放缓存。但是这种方案有弊端,就是维度表更新后,历史join过的数据无法再更新。不知道大家还有什么其他的方案?ps:目前有看到flink有支持join,这种需要两个表都是流的方式进入flink,然后会将历史的数据保存在state里面,这种对于量大的表会不会有问题?

Re: Flink RetractStream如何转成AppendStream?

2019-12-08 Thread lucas.wu
可以使用类似的方式 // val sstream = result4.toRetractStream[Row],filter(_.1==trye).map(_._2) // val result5 = tEnv.fromDataStream(sstream) // result5.toAppendStream[Row].print() 原始邮件 发件人:Jark wuimj...@gmail.com 收件人:user-zhuser...@flink.apache.org 发送时间:2019年12月8日(周日) 11:53 主题:Re: Flink

回复:flink检查点状态大小

2019-11-26 Thread lucas.wu
你这个用了distinct的话,肯定就是针对全局的id进行distinct了,设置state也无效。 建议自己使用bigmap或者hyperlog算法实现一个distinct,这样可以节省内存 原始邮件 发件人:谷歌-akulakuzhan...@akulaku.com 收件人:user-zh@flink.apache.orguser...@flink.apache.org 发送时间:2019年11月26日(周二) 20:05 主题:flink检查点状态大小