flink sql job 并行度

2020-04-28 文章 lucas.wu
Hi all:
最近在使用flink sql 做一些计算任务,发现如果一条sql被解析成execute 
plan后可能会有多个job,但是这些job的并行度是一样的,目前来看,好像还不能对这些job进行并行度的调整,请问一下大家,有什么办法可能调整sql解析后的job的并行度呢?

回复:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效

2020-03-23 文章 lucas.wu
hi ,我们之前写mysql遇到过这种问题,因为任务的open函数是任务启动的时候调用的,是只调用一次的。而open函数里面会对jdbc 
connection进行初始化,当jdbc 
conection因为各种原因断开的时候,例如空闲时间超过max_idel_timeout。这都会导致flush失败,进而导致整个task重启。所以我们后面参照官方的这个JDBCUpsertOutputFormat
 自己写了一个ouputFormat,加上了连接的检查和重连,如果对一致性要求不高的话,还可以对flush进行异常捕捉。


原始邮件
发件人:shangwen583767...@qq.com
收件人:user-zhuser...@flink.apache.org
抄送:kevin.shangwenkevin.shang...@gmail.com
发送时间:2020年3月23日(周一) 11:05
主题:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效


我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志 
2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormatnbsp; - JDBC 
executeBatch error, retry times = 1 org.postgresql.util.PSQLException: This 
connection has been closed. at 
org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) at 
org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) at 
org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
 at 
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
 at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
 at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码 
//JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception 
{ checkFlushException(); for (int i = 1; i = maxRetryTimes; i++) { try { 
jdbcWriter.executeBatch(); batchCount = 0; break; } catch (SQLException e) { 
LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i gt;= 
maxRetryTimes) { throw e; } Thread.sleep(1000 * i); } } } 通过远程debug分析,在第一次执行 
JDBCUpsertOutputFormat.flush nbsp; -gt; AppendOnlyWriter.executeBatch nbsp; 
nbsp; nbsp;... nbsp; nbsp; nbsp;-gt; PgConnection.getAutoCommit 
抛出PSQLException: This connection has been closed时,batchStatements在这之前已经被清空 // 
PgStatement.java private BatchResultHandler internalExecuteBatch() throws 
SQLException { // Construct query/parameter arrays. 
transformQueriesAndParameters(); // Empty arrays should be passed to toArray // 
see http://shipilev.net/blog/2016/arrays-wisdom-ancients/ Query[] queries = 
batchStatements.toArray(new Query[0]); ParameterList[] parameterLists = 
batchParameters.toArray(new ParameterList[0]); batchStatements.clear(); // 
这里已经被清空 batchParameters.clear(); ... if (connection.getAutoCommit()) { // 抛出异常 
flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... } 
所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行?
 // PgStatement.java public int[] executeBatch() throws SQLException { 
checkClosed(); closeForNextExecution(); if (batchStatements == null || 
batchStatements.isEmpty()) { //这里就直接返回了 return new int[0]; } return 
internalExecuteBatch().getUpdateCount(); } 
目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue 
https://issues.apache.org/jira/browse/FLINK-16708

Re: rowtime 的类型序列化问题

2020-03-20 文章 lucas.wu
是的 使用的是blink planner。因为我基于flink的基础上又做了一些简单的开发,所以sinkTable的schmea我是先读取了Select * 
from source_table,
然后把它注册成了一个临时表,然后把这个临时表的schema赋给sinktable,sinkTable同时也继承了RetractStreamTableSink[Row]。
这是他的一个operator连接图
Source: KafkaTableSource(SeqNo, Type, Table, ServerId, Database, OldData, GTID, 
Data, Timestamp, Offset) - 
SourceConversion(table=[default_catalog.default_database.source_table, source: 
[KafkaTableSource(SeqNo, Type, Table, ServerId, Database, OldData, GTID, Data, 
Timestamp, Offset)]], fields=[SeqNo, Type, Table, ServerId, Database, OldData, 
GTID, Data, Timestamp, Offset]) - Calc(select=[SeqNo, Type, Table, ServerId, 
Database, OldData, GTID, Data, Timestamp, Offset, 
from_unixtime((Data.FuiUpdateTime / 1000)) AS FuiUpdateTimeSec, 
(from_unixtime((Data.FuiUpdateTime / 1000)) TO_TIMESTAMP _UTF-16LE'-MM-dd 
HH:mm:ss') AS event_ts]) - WatermarkAssigner(rowtime=[event_ts], 
watermark=[(event_ts - 6:INTERVAL SECOND)]) - Calc(select=[event_ts]) - 
SinkConversionToTuple2 - Sink: ConsoleTableSink(event_ts)


目前从报错信息看,可能是SinkConversionToTuple2这个operator有点问题。
这个算子的 inTypeInfo是BaseRow(event_ts: TIMESTAMP(3) *ROWTIME*)
outTypeInfo是Java Tuple2Boolean, Row(event_ts: TimeIndicatorTypeInfo(rowtime)) 
这两种对TimeIndicatorTypeInfo序列化方式是不一样的。
一个使用BaseRowSerializer会将TimeIndicatorTypeInfo的序列化方式设置成SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer。
 所以我猜测是这里出现了问题。

原始邮件
发件人:Jark wuimj...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年3月20日(周五) 14:21
主题:Re: rowtime 的类型序列化问题


Hi, 请问使用的是 blink planner 么?可以把 sinkTable 的定义也发一下吗? Best, Jark On Fri, 20 Mar 
2020 at 11:40, lucas.wu lucas...@xiaoying.com wrote:  Hi all:  建表语句  create 
table `source_table`(  `SeqNo` varchar,  `Type` varchar,  `Table` varchar,  
`ServerId` varchar,  `Database` varchar,  `OldData` varchar,  `GTID` varchar,  
`Offset` varchar,  `event_ts` as  
to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),  
WATERMARK FOR event_ts AS event_ts - interval '60' second  ) with(…)查询语句  
insert into sinkTable from Select * from source_table; 报错信息:  
java.lang.ClassCastException: java.sql.Timestamp cannot be cast to  
java.lang.Long at  
org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
  at  
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
  at  
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
  at  
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
  at  
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
  at  
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
  at  
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
  at  
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
  at  
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
  at  
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
  at SinkConversion$51.processElement(Unknown Source)  …… 
最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。
  请问这个问题可以避免吗?

Re: flink sql 去重算法

2020-03-19 文章 lucas.wu
可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。


原始邮件
发件人:zhishengzhisheng2...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年3月20日(周五) 11:44
主题:Re: flink sql 去重算法


hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state 
很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS 
集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration 
">https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 ,除此之外不清楚大家是否还有什么其他好的解决方法? Benchao Li libenc...@gmail.com 于2020年3月20日周五 
上午9:50写道:  Hi hiliuxg,   count distinct 用的MapVIew来做的去重:  
在batch场景下,MapView的底层实现就是HashMap;  
在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。   hiliuxg 
736742...@qq.com 于2020年3月19日周四 下午11:31写道:hi all:   请问flink sqlnbsp; 
count(disitinct)nbsp; 底层的算法是怎样的? 是bitmap ?   还是简单通过java的set容器去重的呢? --   
Benchao Li  School of Electronics Engineering and Computer Science, Peking 
University  Tel:+86-15650713730  Email: libenc...@gmail.com; 
libenc...@pku.edu.cn

rowtime 的类型序列化问题

2020-03-19 文章 lucas.wu
Hi all:
建表语句
create table `source_table`(
`SeqNo` varchar,
`Type` varchar,
`Table` varchar,
`ServerId` varchar,
`Database` varchar,
`OldData` varchar,
`GTID` varchar,
`Offset` varchar,
`event_ts` as 
to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),
WATERMARK FOR event_ts AS event_ts - interval '60' second
) with(…)


查询语句
insert into sinkTable from Select * from source_table;



报错信息:
java.lang.ClassCastException: java.sql.Timestamp cannot be cast to 
java.lang.Long at 
org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
 at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
 at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
 at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
 at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
 at SinkConversion$51.processElement(Unknown Source)
……


最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。
请问这个问题可以避免吗?

转发:Re: sql关键字问题

2020-03-18 文章 lucas.wu
原始邮件
发件人:lucas.wulucas...@xiaoying.com
收件人:imj...@gmail.com
发送时间:2020年3月18日(周三) 17:21
主题:Re: sql关键字问题


Hi,jark
看到了你修复的这个jirahttps://issues.apache.org/jira/browse/FLINK-16526
但是看了你的代码和描述,你只是针对SqlBasicCall这种node的字段名加了`` 
,也就是说只会对有computed_column_expression的字段加上,但是对于普通的字段并没有覆盖到,请问我理解的正确吗?


原始邮件
发件人:Kurt youngykt...@gmail.com
收件人:user-zhuser...@flink.apache.org
抄送:Yuzhao chenyuzhao@gmail.com
发送时间:2020年3月18日(周三) 16:41
主题:Re: sql关键字问题


好像已经有了,应该是这个jira: https://issues.apache.org/jira/browse/FLINK-16526 Best, Kurt 
On Wed, Mar 18, 2020 at 4:19 PM Jingsong Li jingsongl...@gmail.com wrote:  Hi 
lucas,   赞专业的分析,看起来是Flink的bug,你可以建个Jira来跟踪。  CC: @Yuzhao Chen 
yuzhao@gmail.com   Best,  Jingsong Lee   On Wed, Mar 18, 2020 at 4:15 PM 
lucas.wu lucas...@xiaoying.com wrote:初步找到了原因   
原来我的建表语句用了computed_column_expression 这种语义。   然后flink内部在使用的时候其实是把它转成了select 语句   
...   if (columnExprs.nonEmpty) {   val fieldExprs = fieldNames   .map { name = 
  if (columnExprs.contains(name)) {   columnExprs(name)   } else {   name   }   
}.toArray   val rexNodes =   
toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs)   ….. 
  然后我们看看convertToRexNodes方法   public RexNode[] 
convertToRexNodes(String[] exprs) {   ….   String query = 
String.format(QUERY_FORMAT, String.join(",", exprs));   SqlNode parsed = 
planner.parser().parse(query);   }   重点就在这个QUERY_FORMAT   private static 
final String QUERY_FORMAT = "SELECT %s FROM " +   TEMPORARY_TABLE_NAME;   
这样写是有问题的,当我的字段本身是有``的时候,就被去掉了,导致后面valid的时候就报错。   所以这个是算flink的bug吗?   原始邮件   
发件人:lucas.wulucas...@xiaoying.com   收件人:user-zhuser...@flink.apache.org   
发送时间:2020年3月18日(周三) 15:36   主题:sql关键字问题   create table `source_table`( 
`SeqNo` varchar, `Type` varchar, `Table`   varchar, `ServerId` varchar, 
`Database` varchar, `OldData` varchar,  `GTID`   varchar, `Offset` varchar, 
`event_ts` as   
to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),   
WATERMARK FOR event_ts AS event_ts - interval '60' second ) with(…) 查询语句   
Select * from source_table; 这是我的建表和查询语句,发现即使用了`` 查询的时候还是会报Table是关键字的问题。  SQL   
parse failed. Encountered "Table" at line 1,column 19. 但是一旦我把 `event_ts`  as   
to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),   
WATERMARK FOR event_ts AS event_ts - interval '60’ second 这两行去掉   
,就正常了。是我的使用方法有问题吗? --  Best, Jingsong Lee

回复:sql关键字问题

2020-03-18 文章 lucas.wu
初步找到了原因
原来我的建表语句用了computed_column_expression 这种语义。
然后flink内部在使用的时候其实是把它转成了select 语句
...
if (columnExprs.nonEmpty) {
 val fieldExprs = fieldNames
 .map { name =
 if (columnExprs.contains(name)) {
 columnExprs(name)
 } else {
 name
 }
 }.toArray
 val rexNodes = 
toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs)
…..


然后我们看看convertToRexNodes方法


public RexNode[] convertToRexNodes(String[] exprs) {
….
 String query = String.format(QUERY_FORMAT, String.join(",", exprs));
 SqlNode parsed = planner.parser().parse(query);
}


重点就在这个QUERY_FORMAT
private static final String QUERY_FORMAT = "SELECT %s FROM " + 
TEMPORARY_TABLE_NAME;


这样写是有问题的,当我的字段本身是有``的时候,就被去掉了,导致后面valid的时候就报错。


所以这个是算flink的bug吗?
原始邮件
发件人:lucas.wulucas...@xiaoying.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年3月18日(周三) 15:36
主题:sql关键字问题


create table `source_table`( `SeqNo` varchar, `Type` varchar, `Table` varchar, 
`ServerId` varchar, `Database` varchar, `OldData` varchar, `GTID` varchar, 
`Offset` varchar, `event_ts` as 
to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'), 
WATERMARK FOR event_ts AS event_ts - interval '60' second ) with(…) 查询语句 Select 
* from source_table; 这是我的建表和查询语句,发现即使用了`` 查询的时候还是会报Table是关键字的问题。 SQL parse 
failed. Encountered "Table" at line 1,column 19. 但是一旦我把 `event_ts` as 
to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'), 
WATERMARK FOR event_ts AS event_ts - interval '60’ second 这两行去掉 
,就正常了。是我的使用方法有问题吗?

sql关键字问题

2020-03-18 文章 lucas.wu
create table `source_table`(
 `SeqNo` varchar,
 `Type` varchar,
 `Table` varchar,
 `ServerId` varchar,
 `Database` varchar,
 `OldData` varchar,
 `GTID` varchar,
 `Offset` varchar,
 `event_ts` as 
to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),
 WATERMARK FOR event_ts AS event_ts - interval '60' second
 ) with(…)


查询语句
Select * from source_table;


这是我的建表和查询语句,发现即使用了`` 查询的时候还是会报Table是关键字的问题。
SQL parse failed. Encountered "Table" at line 1,column 19.


但是一旦我把
`event_ts` as 
to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),
WATERMARK FOR event_ts AS event_ts - interval '60’ second
这两行去掉 ,就正常了。是我的使用方法有问题吗?

回复:ddl

2020-03-13 文章 lucas.wu
有相应的接口 可以参考hbase的实现


原始邮件
发件人:王志华a15733178...@163.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年3月13日(周五) 19:17
主题:ddl


目前FLINK中对于DDL这块,它都只能什么类型的技术作为源头表或者SINK 表呢,我也网上也仅仅看到了ddl mysql sink、ddl hbase 
sink等。还有其他类型的支持吗?如果不支持的话,是否flink开放了相关的接口,可以提供对其他类型技术的ddl语法支持呢?比如想做一个 ddl kudu 
sink之类的 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制

回复:flink 长时间运行后出现报错

2020-03-09 文章 lucas.wu
没人回复大概是之前没人遇到过这种问题,所以下午看了flink的代码,终于有了点头绪。
原因分析:
这个异常的原因就是在task出现异常之后,它需要调用updateTaskExecutionState(TaskExecutionState 
taskExecutionState)这个rpc接口去通知flink jobmanager
去改变对应task的状态并且重启task。但是呢,taskExecutionState这个参数里面有个error属性,当我的的task打出来的错误栈太多的时候,在序列化的之后超过了
rpc接口要求的最大数据大小(也就是maximum akka framesize),导致调用updateTaskExecutionState 
这个rpc接口失败,jobmanager无法获知这个task已经fail
的状态,也无法重启。这就导致了一系列连锁反应,其中一个就是我的checkpoint一直失败,原因就是我的task其实已经释放了,但是jobmanger无法感知。

结论:
这个算不算flink的一个bug,对于task已经失效,但是无法通知到jobmanger,导致该task一直无法重启。
原始邮件
发件人:lucas.wulucas...@xiaoying.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年3月9日(周一) 11:06
主题:flink 长时间运行后出现报错


大家好: 我的flink程序的主要功能是从kafka消费数据,简单处理后,通过jdbc 
outputformat发往mysql,但是长时间运行后,报下面的错。请问是什么原因造成的,我增加参数就可以解决吗? 2020-03-08 
06:10:30,480 WARN org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler - 
Could not create remote rpc invocation message. Failing rpc invocation 
because... java.io.IOException: The rpc invocation size 34500577 exceeds the 
maximum akka framesize. at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271)
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200)
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78)
 at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
akka.actor.Actor$class.aroundReceive(Actor.scala:517) at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at 
akka.actor.ActorCell.invoke(ActorCell.scala:496) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at 
akka.dispatch.Mailbox.run(Mailbox.scala:224) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:234) at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
2020-03-08 06:10:30,480 ERROR org.apache.flink.runtime.rpc.akka.AkkaRpcActor - 
Caught exception while executing runnable in main thread. 
java.lang.reflect.UndeclaredThrowableException at 
com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
akka.actor.Actor$class.aroundReceive(Actor.scala:517) at 

flink 长时间运行后出现报错

2020-03-08 文章 lucas.wu
大家好:
我的flink程序的主要功能是从kafka消费数据,简单处理后,通过jdbc 
outputformat发往mysql,但是长时间运行后,报下面的错。请问是什么原因造成的,我增加参数就可以解决吗?


2020-03-08 06:10:30,480 WARN 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler - Could not create 
remote rpc invocation message. Failing rpc invocation because...
java.io.IOException: The rpc invocation size 34500577 exceeds the maximum akka 
framesize.
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78)
at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
at akka.actor.ActorCell.invoke(ActorCell.scala:496)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-03-08 06:10:30,480 ERROR org.apache.flink.runtime.rpc.akka.AkkaRpcActor - 
Caught exception while executing runnable in main thread.
java.lang.reflect.UndeclaredThrowableException
at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
at akka.actor.ActorCell.invoke(ActorCell.scala:496)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: The rpc invocation size 34500577 exceeds the 
maximum akka 

关于task异常的问题

2020-03-02 文章 lucas.wu
Hi 大家好
最近有使用flink自带的jdbc outputformat 
将flink处理后的数据写到mysql,但是如果我的数据格式有问题,比如超过mysql对应字段设置的大小,或者数据库出现问题,导致延时。这些问题都会导致这个task抛出异常,导致task
 fail,进而导致整个job从checkpoint重启。
我的问题是,如果我使用的是flink提供的outputformat,我是否可以catch 异常,并且忽略。如果没有,有没有其它好的办法?

回复:Flink 1.6 版本,RocksDBStateBackend ,增量 Checkpoint 目录问题

2020-01-16 文章 lucas.wu
可能是失败的checkpoint目录,可以看看程序中间是不是有失败的checkpoint


原始邮件
发件人:lakeshenshenleifight...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年1月16日(周四) 16:47
主题:Flink 1.6 版本,RocksDBStateBackend ,增量 Checkpoint 目录问题


我使用 Flink 1.6 版本,使用的增量 Checkpoint,我看官网说,默认的 Checkpoint 保留目录是1,也就是会保留一个最新完成的 
Checkpoint 目录,但是在我的任务 Checkpoint 路径下面,居然有很多个 chk-xxx 目录,比如说 
chk-86515,chk-37878,而且在这些目录下面,还有数据,这是什么原因呢。 对这个地方有点困惑,既然默认保留的目录是1了,为什么还有这么多 chk 
目录呢。 期待你的回答

Re: flink 创建hbase出错

2020-01-02 文章 lucas.wu
Hi:
这个方法之前试过了,确实有效的。但是有个地方不明白,就是sql解析这个是在driver端进行的,我的driver的jar包已经包含hbase相关的jar包,为什么还要在lib目录下加上?


原始邮件
发件人:Terry wangzjuwa...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年1月3日(周五) 11:02
主题:Re: flink 创建hbase出错


Hi, flink-hbase_2.11-1.9.0.jar 只包括了flink对hbase读写的封装的类,并没有提供hbase 
client的类,你需要把hbaes client等相关的jar包提供出来放到 lib包里面。 Best, Terry Wang  2020年1月2日 
16:54,lucas.wu lucas...@xiaoying.com 写道:   Hi 大家好  
有个问题要问问大家,我现在用flink1.9版本创建hbase表  sql:  create table hbase_dimention_table(  id 
varchar,  info ROW(xxx)  )with(  'connector.type' = 'hbase',  
'connector.version' = '1.4.3',  'connector.table-name' = '',  
'connector.zookeeper.quorum' = ‘xxx'  );  接着把flink-hbase_2.11-1.9.0.jar 
放到了lib目录下,但是在执行的时候出现这种错误  
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: SQL validation failed. findAndCreateTableSource failed  Caused 
by: java.lang.ClassNotFoundException: 
org.apache.hadoop.hbase.HBaseConfiguration  请问我还需要在那里加上依赖?

flink 创建hbase出错

2020-01-02 文章 lucas.wu
Hi 大家好
有个问题要问问大家,我现在用flink1.9版本创建hbase表
sql:
create table hbase_dimention_table(
 id varchar,
 info ROW(xxx)
)with(
'connector.type' = 'hbase',
 'connector.version' = '1.4.3', 
 'connector.table-name' = '', 
 'connector.zookeeper.quorum' = ‘xxx'
);
接着把flink-hbase_2.11-1.9.0.jar 放到了lib目录下,但是在执行的时候出现这种错误
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: SQL validation failed. findAndCreateTableSource failed
Caused by: java.lang.ClassNotFoundException: 
org.apache.hadoop.hbase.HBaseConfiguration




请问我还需要在那里加上依赖?

Re: flink 维表关联

2019-12-25 文章 lucas.wu
Hi 李现
现实确实很难做到对流表进行全量的join,如需全量,state会占用很大的存储,而且后续迁移很困难。请问一下你说的这个方案可以举个例子吗?


原始邮件
发件人:李现stormallin2...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2019年12月26日(周四) 08:44
主题:Re: flink 维表关联


流的大小应该不是无限制的,应该是有个窗口期?窗口期之外的数据离线处理? xin Destiny 
nj18652727...@gmail.com于2019年12月25日 周三18:13写道:  Hi,lucas.wu:   
我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发;
  不过这样state会占用很大的内存,需要主意state的清理   lucas.wu lucas...@xiaoying.com 
于2019年12月25日周三 下午5:13写道:hi all:   
flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?

flink 维表关联

2019-12-25 文章 lucas.wu
hi all:
flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?

使用flink 做维表关联

2019-12-20 文章 lucas.wu
hi 大家好:
最近有在调研使用flink做实时数仓,但是有个问题没弄清楚,就是明细表和维度表做join的时候,该采取什么的方案?目前的想到的就是明细表通过流消费进来,维度表放缓存。但是这种方案有弊端,就是维度表更新后,历史join过的数据无法再更新。不知道大家还有什么其他的方案?ps:目前有看到flink有支持join,这种需要两个表都是流的方式进入flink,然后会将历史的数据保存在state里面,这种对于量大的表会不会有问题?

Re: Flink RetractStream如何转成AppendStream?

2019-12-08 文章 lucas.wu
可以使用类似的方式
//   val sstream = result4.toRetractStream[Row],filter(_.1==trye).map(_._2)
//   val result5 = tEnv.fromDataStream(sstream)
//   result5.toAppendStream[Row].print()


原始邮件
发件人:Jark wuimj...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2019年12月8日(周日) 11:53
主题:Re: Flink RetractStream如何转成AppendStream?


Hi, 目前 Kafka 只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持 
RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。 Best, Jark On Sun, 8 Dec 
2019 at 10:08, 陈帅 casel.c...@gmail.com wrote:  在用Flink做实时数仓时遇到group 
by统计后需要将结果发到kafka,但是现在的kafka   
sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?

回复:flink检查点状态大小

2019-11-26 文章 lucas.wu
你这个用了distinct的话,肯定就是针对全局的id进行distinct了,设置state也无效。
建议自己使用bigmap或者hyperlog算法实现一个distinct,这样可以节省内存


原始邮件
发件人:谷歌-akulakuzhan...@akulaku.com
收件人:user-zh@flink.apache.orguser...@flink.apache.org
发送时间:2019年11月26日(周二) 20:05
主题:flink检查点状态大小


streamTableEnv.getConfig().setIdleStateRetentionTime(Time.minutes(15), 
Time.minutes(20)); 
我在程序中设置状态保留时间,然后用全局group进行计算,但是过期状态没有清理导致状态也来越大,最终内存溢出,请问这是什么原因导致的 运行SQL select 
count(distinct id) as user_count,adjust_time from (select data.f13 as 
country_id,data.f1 as 
id,concat(DATE_FORMAT(FROM_UNIXTIME(data.f12/1000),'-MM-dd HH:mm'),':00') 
as adjust_time from userActionLog3) access_user_count where country_id='1' 
group by adjust_time 发送自 Windows 10 版邮件应用