flink sql job 并行度
Hi all: 最近在使用flink sql 做一些计算任务,发现如果一条sql被解析成execute plan后可能会有多个job,但是这些job的并行度是一样的,目前来看,好像还不能对这些job进行并行度的调整,请问一下大家,有什么办法可能调整sql解析后的job的并行度呢?
回复:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效
hi ,我们之前写mysql遇到过这种问题,因为任务的open函数是任务启动的时候调用的,是只调用一次的。而open函数里面会对jdbc connection进行初始化,当jdbc conection因为各种原因断开的时候,例如空闲时间超过max_idel_timeout。这都会导致flush失败,进而导致整个task重启。所以我们后面参照官方的这个JDBCUpsertOutputFormat 自己写了一个ouputFormat,加上了连接的检查和重连,如果对一致性要求不高的话,还可以对flush进行异常捕捉。 原始邮件 发件人:shangwen583767...@qq.com 收件人:user-zhuser...@flink.apache.org 抄送:kevin.shangwenkevin.shang...@gmail.com 发送时间:2020年3月23日(周一) 11:05 主题:Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效 我们在测试环境测试JDBC写入postgresql的场景,用tcpkill模拟链接被关闭的情况,测试对异常的兼容性,我们发现一直打印类似的日志 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormatnbsp; - JDBC executeBatch error, retry times = 1 org.postgresql.util.PSQLException: This connection has been closed. at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 从日志上看来,链接被关闭,只会导致重试一次,这里是Flink的重试代码 //JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception { checkFlushException(); for (int i = 1; i = maxRetryTimes; i++) { try { jdbcWriter.executeBatch(); batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i gt;= maxRetryTimes) { throw e; } Thread.sleep(1000 * i); } } } 通过远程debug分析,在第一次执行 JDBCUpsertOutputFormat.flush nbsp; -gt; AppendOnlyWriter.executeBatch nbsp; nbsp; nbsp;... nbsp; nbsp; nbsp;-gt; PgConnection.getAutoCommit 抛出PSQLException: This connection has been closed时,batchStatements在这之前已经被清空 // PgStatement.java private BatchResultHandler internalExecuteBatch() throws SQLException { // Construct query/parameter arrays. transformQueriesAndParameters(); // Empty arrays should be passed to toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/ Query[] queries = batchStatements.toArray(new Query[0]); ParameterList[] parameterLists = batchParameters.toArray(new ParameterList[0]); batchStatements.clear(); // 这里已经被清空 batchParameters.clear(); ... if (connection.getAutoCommit()) { // 抛出异常 flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... } 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行? // PgStatement.java public int[] executeBatch() throws SQLException { checkClosed(); closeForNextExecution(); if (batchStatements == null || batchStatements.isEmpty()) { //这里就直接返回了 return new int[0]; } return internalExecuteBatch().getUpdateCount(); } 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue https://issues.apache.org/jira/browse/FLINK-16708
Re: rowtime 的类型序列化问题
是的 使用的是blink planner。因为我基于flink的基础上又做了一些简单的开发,所以sinkTable的schmea我是先读取了Select * from source_table, 然后把它注册成了一个临时表,然后把这个临时表的schema赋给sinktable,sinkTable同时也继承了RetractStreamTableSink[Row]。 这是他的一个operator连接图 Source: KafkaTableSource(SeqNo, Type, Table, ServerId, Database, OldData, GTID, Data, Timestamp, Offset) - SourceConversion(table=[default_catalog.default_database.source_table, source: [KafkaTableSource(SeqNo, Type, Table, ServerId, Database, OldData, GTID, Data, Timestamp, Offset)]], fields=[SeqNo, Type, Table, ServerId, Database, OldData, GTID, Data, Timestamp, Offset]) - Calc(select=[SeqNo, Type, Table, ServerId, Database, OldData, GTID, Data, Timestamp, Offset, from_unixtime((Data.FuiUpdateTime / 1000)) AS FuiUpdateTimeSec, (from_unixtime((Data.FuiUpdateTime / 1000)) TO_TIMESTAMP _UTF-16LE'-MM-dd HH:mm:ss') AS event_ts]) - WatermarkAssigner(rowtime=[event_ts], watermark=[(event_ts - 6:INTERVAL SECOND)]) - Calc(select=[event_ts]) - SinkConversionToTuple2 - Sink: ConsoleTableSink(event_ts) 目前从报错信息看,可能是SinkConversionToTuple2这个operator有点问题。 这个算子的 inTypeInfo是BaseRow(event_ts: TIMESTAMP(3) *ROWTIME*) outTypeInfo是Java Tuple2Boolean, Row(event_ts: TimeIndicatorTypeInfo(rowtime)) 这两种对TimeIndicatorTypeInfo序列化方式是不一样的。 一个使用BaseRowSerializer会将TimeIndicatorTypeInfo的序列化方式设置成SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer。 所以我猜测是这里出现了问题。 原始邮件 发件人:Jark wuimj...@gmail.com 收件人:user-zhuser...@flink.apache.org 发送时间:2020年3月20日(周五) 14:21 主题:Re: rowtime 的类型序列化问题 Hi, 请问使用的是 blink planner 么?可以把 sinkTable 的定义也发一下吗? Best, Jark On Fri, 20 Mar 2020 at 11:40, lucas.wu lucas...@xiaoying.com wrote: Hi all: 建表语句 create table `source_table`( `SeqNo` varchar, `Type` varchar, `Table` varchar, `ServerId` varchar, `Database` varchar, `OldData` varchar, `GTID` varchar, `Offset` varchar, `event_ts` as to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'), WATERMARK FOR event_ts AS event_ts - interval '60' second ) with(…)查询语句 insert into sinkTable from Select * from source_table; 报错信息: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SinkConversion$51.processElement(Unknown Source) …… 最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。 请问这个问题可以避免吗?
Re: flink sql 去重算法
可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。 原始邮件 发件人:zhishengzhisheng2...@gmail.com 收件人:user-zhuser...@flink.apache.org 发送时间:2020年3月20日(周五) 11:44 主题:Re: flink sql 去重算法 hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state 很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS 集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration ">https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html ,除此之外不清楚大家是否还有什么其他好的解决方法? Benchao Li libenc...@gmail.com 于2020年3月20日周五 上午9:50写道: Hi hiliuxg, count distinct 用的MapVIew来做的去重: 在batch场景下,MapView的底层实现就是HashMap; 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。 hiliuxg 736742...@qq.com 于2020年3月19日周四 下午11:31写道:hi all: 请问flink sqlnbsp; count(disitinct)nbsp; 底层的算法是怎样的? 是bitmap ? 还是简单通过java的set容器去重的呢? -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
rowtime 的类型序列化问题
Hi all: 建表语句 create table `source_table`( `SeqNo` varchar, `Type` varchar, `Table` varchar, `ServerId` varchar, `Database` varchar, `OldData` varchar, `GTID` varchar, `Offset` varchar, `event_ts` as to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'), WATERMARK FOR event_ts AS event_ts - interval '60' second ) with(…) 查询语句 insert into sinkTable from Select * from source_table; 报错信息: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long at org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SinkConversion$51.processElement(Unknown Source) …… 最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。 请问这个问题可以避免吗?
转发:Re: sql关键字问题
原始邮件 发件人:lucas.wulucas...@xiaoying.com 收件人:imj...@gmail.com 发送时间:2020年3月18日(周三) 17:21 主题:Re: sql关键字问题 Hi,jark 看到了你修复的这个jirahttps://issues.apache.org/jira/browse/FLINK-16526 但是看了你的代码和描述,你只是针对SqlBasicCall这种node的字段名加了`` ,也就是说只会对有computed_column_expression的字段加上,但是对于普通的字段并没有覆盖到,请问我理解的正确吗? 原始邮件 发件人:Kurt youngykt...@gmail.com 收件人:user-zhuser...@flink.apache.org 抄送:Yuzhao chenyuzhao@gmail.com 发送时间:2020年3月18日(周三) 16:41 主题:Re: sql关键字问题 好像已经有了,应该是这个jira: https://issues.apache.org/jira/browse/FLINK-16526 Best, Kurt On Wed, Mar 18, 2020 at 4:19 PM Jingsong Li jingsongl...@gmail.com wrote: Hi lucas, 赞专业的分析,看起来是Flink的bug,你可以建个Jira来跟踪。 CC: @Yuzhao Chen yuzhao@gmail.com Best, Jingsong Lee On Wed, Mar 18, 2020 at 4:15 PM lucas.wu lucas...@xiaoying.com wrote:初步找到了原因 原来我的建表语句用了computed_column_expression 这种语义。 然后flink内部在使用的时候其实是把它转成了select 语句 ... if (columnExprs.nonEmpty) { val fieldExprs = fieldNames .map { name = if (columnExprs.contains(name)) { columnExprs(name) } else { name } }.toArray val rexNodes = toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs) ….. 然后我们看看convertToRexNodes方法 public RexNode[] convertToRexNodes(String[] exprs) { …. String query = String.format(QUERY_FORMAT, String.join(",", exprs)); SqlNode parsed = planner.parser().parse(query); } 重点就在这个QUERY_FORMAT private static final String QUERY_FORMAT = "SELECT %s FROM " + TEMPORARY_TABLE_NAME; 这样写是有问题的,当我的字段本身是有``的时候,就被去掉了,导致后面valid的时候就报错。 所以这个是算flink的bug吗? 原始邮件 发件人:lucas.wulucas...@xiaoying.com 收件人:user-zhuser...@flink.apache.org 发送时间:2020年3月18日(周三) 15:36 主题:sql关键字问题 create table `source_table`( `SeqNo` varchar, `Type` varchar, `Table` varchar, `ServerId` varchar, `Database` varchar, `OldData` varchar, `GTID` varchar, `Offset` varchar, `event_ts` as to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'), WATERMARK FOR event_ts AS event_ts - interval '60' second ) with(…) 查询语句 Select * from source_table; 这是我的建表和查询语句,发现即使用了`` 查询的时候还是会报Table是关键字的问题。 SQL parse failed. Encountered "Table" at line 1,column 19. 但是一旦我把 `event_ts` as to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'), WATERMARK FOR event_ts AS event_ts - interval '60’ second 这两行去掉 ,就正常了。是我的使用方法有问题吗? -- Best, Jingsong Lee
回复:sql关键字问题
初步找到了原因 原来我的建表语句用了computed_column_expression 这种语义。 然后flink内部在使用的时候其实是把它转成了select 语句 ... if (columnExprs.nonEmpty) { val fieldExprs = fieldNames .map { name = if (columnExprs.contains(name)) { columnExprs(name) } else { name } }.toArray val rexNodes = toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs) ….. 然后我们看看convertToRexNodes方法 public RexNode[] convertToRexNodes(String[] exprs) { …. String query = String.format(QUERY_FORMAT, String.join(",", exprs)); SqlNode parsed = planner.parser().parse(query); } 重点就在这个QUERY_FORMAT private static final String QUERY_FORMAT = "SELECT %s FROM " + TEMPORARY_TABLE_NAME; 这样写是有问题的,当我的字段本身是有``的时候,就被去掉了,导致后面valid的时候就报错。 所以这个是算flink的bug吗? 原始邮件 发件人:lucas.wulucas...@xiaoying.com 收件人:user-zhuser...@flink.apache.org 发送时间:2020年3月18日(周三) 15:36 主题:sql关键字问题 create table `source_table`( `SeqNo` varchar, `Type` varchar, `Table` varchar, `ServerId` varchar, `Database` varchar, `OldData` varchar, `GTID` varchar, `Offset` varchar, `event_ts` as to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'), WATERMARK FOR event_ts AS event_ts - interval '60' second ) with(…) 查询语句 Select * from source_table; 这是我的建表和查询语句,发现即使用了`` 查询的时候还是会报Table是关键字的问题。 SQL parse failed. Encountered "Table" at line 1,column 19. 但是一旦我把 `event_ts` as to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'), WATERMARK FOR event_ts AS event_ts - interval '60’ second 这两行去掉 ,就正常了。是我的使用方法有问题吗?
sql关键字问题
create table `source_table`( `SeqNo` varchar, `Type` varchar, `Table` varchar, `ServerId` varchar, `Database` varchar, `OldData` varchar, `GTID` varchar, `Offset` varchar, `event_ts` as to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'), WATERMARK FOR event_ts AS event_ts - interval '60' second ) with(…) 查询语句 Select * from source_table; 这是我的建表和查询语句,发现即使用了`` 查询的时候还是会报Table是关键字的问题。 SQL parse failed. Encountered "Table" at line 1,column 19. 但是一旦我把 `event_ts` as to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'), WATERMARK FOR event_ts AS event_ts - interval '60’ second 这两行去掉 ,就正常了。是我的使用方法有问题吗?
回复:ddl
有相应的接口 可以参考hbase的实现 原始邮件 发件人:王志华a15733178...@163.com 收件人:user-zhuser...@flink.apache.org 发送时间:2020年3月13日(周五) 19:17 主题:ddl 目前FLINK中对于DDL这块,它都只能什么类型的技术作为源头表或者SINK 表呢,我也网上也仅仅看到了ddl mysql sink、ddl hbase sink等。还有其他类型的支持吗?如果不支持的话,是否flink开放了相关的接口,可以提供对其他类型技术的ddl语法支持呢?比如想做一个 ddl kudu sink之类的 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制
回复:flink 长时间运行后出现报错
没人回复大概是之前没人遇到过这种问题,所以下午看了flink的代码,终于有了点头绪。 原因分析: 这个异常的原因就是在task出现异常之后,它需要调用updateTaskExecutionState(TaskExecutionState taskExecutionState)这个rpc接口去通知flink jobmanager 去改变对应task的状态并且重启task。但是呢,taskExecutionState这个参数里面有个error属性,当我的的task打出来的错误栈太多的时候,在序列化的之后超过了 rpc接口要求的最大数据大小(也就是maximum akka framesize),导致调用updateTaskExecutionState 这个rpc接口失败,jobmanager无法获知这个task已经fail 的状态,也无法重启。这就导致了一系列连锁反应,其中一个就是我的checkpoint一直失败,原因就是我的task其实已经释放了,但是jobmanger无法感知。 结论: 这个算不算flink的一个bug,对于task已经失效,但是无法通知到jobmanger,导致该task一直无法重启。 原始邮件 发件人:lucas.wulucas...@xiaoying.com 收件人:user-zhuser...@flink.apache.org 发送时间:2020年3月9日(周一) 11:06 主题:flink 长时间运行后出现报错 大家好: 我的flink程序的主要功能是从kafka消费数据,简单处理后,通过jdbc outputformat发往mysql,但是长时间运行后,报下面的错。请问是什么原因造成的,我增加参数就可以解决吗? 2020-03-08 06:10:30,480 WARN org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler - Could not create remote rpc invocation message. Failing rpc invocation because... java.io.IOException: The rpc invocation size 34500577 exceeds the maximum akka framesize. at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129) at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78) at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428) at org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458) at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154) at org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at akka.actor.ActorCell.invoke(ActorCell.scala:496) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2020-03-08 06:10:30,480 ERROR org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Caught exception while executing runnable in main thread. java.lang.reflect.UndeclaredThrowableException at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428) at org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458) at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154) at org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
flink 长时间运行后出现报错
大家好: 我的flink程序的主要功能是从kafka消费数据,简单处理后,通过jdbc outputformat发往mysql,但是长时间运行后,报下面的错。请问是什么原因造成的,我增加参数就可以解决吗? 2020-03-08 06:10:30,480 WARN org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler - Could not create remote rpc invocation message. Failing rpc invocation because... java.io.IOException: The rpc invocation size 34500577 exceeds the maximum akka framesize. at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129) at org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78) at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428) at org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458) at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154) at org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at akka.actor.ActorCell.invoke(ActorCell.scala:496) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2020-03-08 06:10:30,480 ERROR org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Caught exception while executing runnable in main thread. java.lang.reflect.UndeclaredThrowableException at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428) at org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458) at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154) at org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at akka.actor.ActorCell.invoke(ActorCell.scala:496) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.IOException: The rpc invocation size 34500577 exceeds the maximum akka
关于task异常的问题
Hi 大家好 最近有使用flink自带的jdbc outputformat 将flink处理后的数据写到mysql,但是如果我的数据格式有问题,比如超过mysql对应字段设置的大小,或者数据库出现问题,导致延时。这些问题都会导致这个task抛出异常,导致task fail,进而导致整个job从checkpoint重启。 我的问题是,如果我使用的是flink提供的outputformat,我是否可以catch 异常,并且忽略。如果没有,有没有其它好的办法?
回复:Flink 1.6 版本,RocksDBStateBackend ,增量 Checkpoint 目录问题
可能是失败的checkpoint目录,可以看看程序中间是不是有失败的checkpoint 原始邮件 发件人:lakeshenshenleifight...@gmail.com 收件人:user-zhuser...@flink.apache.org 发送时间:2020年1月16日(周四) 16:47 主题:Flink 1.6 版本,RocksDBStateBackend ,增量 Checkpoint 目录问题 我使用 Flink 1.6 版本,使用的增量 Checkpoint,我看官网说,默认的 Checkpoint 保留目录是1,也就是会保留一个最新完成的 Checkpoint 目录,但是在我的任务 Checkpoint 路径下面,居然有很多个 chk-xxx 目录,比如说 chk-86515,chk-37878,而且在这些目录下面,还有数据,这是什么原因呢。 对这个地方有点困惑,既然默认保留的目录是1了,为什么还有这么多 chk 目录呢。 期待你的回答
Re: flink 创建hbase出错
Hi: 这个方法之前试过了,确实有效的。但是有个地方不明白,就是sql解析这个是在driver端进行的,我的driver的jar包已经包含hbase相关的jar包,为什么还要在lib目录下加上? 原始邮件 发件人:Terry wangzjuwa...@gmail.com 收件人:user-zhuser...@flink.apache.org 发送时间:2020年1月3日(周五) 11:02 主题:Re: flink 创建hbase出错 Hi, flink-hbase_2.11-1.9.0.jar 只包括了flink对hbase读写的封装的类,并没有提供hbase client的类,你需要把hbaes client等相关的jar包提供出来放到 lib包里面。 Best, Terry Wang 2020年1月2日 16:54,lucas.wu lucas...@xiaoying.com 写道: Hi 大家好 有个问题要问问大家,我现在用flink1.9版本创建hbase表 sql: create table hbase_dimention_table( id varchar, info ROW(xxx) )with( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = '', 'connector.zookeeper.quorum' = ‘xxx' ); 接着把flink-hbase_2.11-1.9.0.jar 放到了lib目录下,但是在执行的时候出现这种错误 org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. findAndCreateTableSource failed Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.HBaseConfiguration 请问我还需要在那里加上依赖?
flink 创建hbase出错
Hi 大家好 有个问题要问问大家,我现在用flink1.9版本创建hbase表 sql: create table hbase_dimention_table( id varchar, info ROW(xxx) )with( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = '', 'connector.zookeeper.quorum' = ‘xxx' ); 接着把flink-hbase_2.11-1.9.0.jar 放到了lib目录下,但是在执行的时候出现这种错误 org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. findAndCreateTableSource failed Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.HBaseConfiguration 请问我还需要在那里加上依赖?
Re: flink 维表关联
Hi 李现 现实确实很难做到对流表进行全量的join,如需全量,state会占用很大的存储,而且后续迁移很困难。请问一下你说的这个方案可以举个例子吗? 原始邮件 发件人:李现stormallin2...@gmail.com 收件人:user-zhuser...@flink.apache.org 发送时间:2019年12月26日(周四) 08:44 主题:Re: flink 维表关联 流的大小应该不是无限制的,应该是有个窗口期?窗口期之外的数据离线处理? xin Destiny nj18652727...@gmail.com于2019年12月25日 周三18:13写道: Hi,lucas.wu: 我个人觉得可以把join的条件和流对应的数据存放在mapstate中,每次维表的缓存更新数据之后,去mapstate中查询,如果存在对应的KV,将新关联后的数据下发; 不过这样state会占用很大的内存,需要主意state的清理 lucas.wu lucas...@xiaoying.com 于2019年12月25日周三 下午5:13写道:hi all: flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?
flink 维表关联
hi all: flink的kafka流表与hbase维表关联,维表后面有变动的话,如何将之前关联过的数据进行更新?
使用flink 做维表关联
hi 大家好: 最近有在调研使用flink做实时数仓,但是有个问题没弄清楚,就是明细表和维度表做join的时候,该采取什么的方案?目前的想到的就是明细表通过流消费进来,维度表放缓存。但是这种方案有弊端,就是维度表更新后,历史join过的数据无法再更新。不知道大家还有什么其他的方案?ps:目前有看到flink有支持join,这种需要两个表都是流的方式进入flink,然后会将历史的数据保存在state里面,这种对于量大的表会不会有问题?
Re: Flink RetractStream如何转成AppendStream?
可以使用类似的方式 // val sstream = result4.toRetractStream[Row],filter(_.1==trye).map(_._2) // val result5 = tEnv.fromDataStream(sstream) // result5.toAppendStream[Row].print() 原始邮件 发件人:Jark wuimj...@gmail.com 收件人:user-zhuser...@flink.apache.org 发送时间:2019年12月8日(周日) 11:53 主题:Re: Flink RetractStream如何转成AppendStream? Hi, 目前 Kafka 只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持 RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。 Best, Jark On Sun, 8 Dec 2019 at 10:08, 陈帅 casel.c...@gmail.com wrote: 在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?
回复:flink检查点状态大小
你这个用了distinct的话,肯定就是针对全局的id进行distinct了,设置state也无效。 建议自己使用bigmap或者hyperlog算法实现一个distinct,这样可以节省内存 原始邮件 发件人:谷歌-akulakuzhan...@akulaku.com 收件人:user-zh@flink.apache.orguser...@flink.apache.org 发送时间:2019年11月26日(周二) 20:05 主题:flink检查点状态大小 streamTableEnv.getConfig().setIdleStateRetentionTime(Time.minutes(15), Time.minutes(20)); 我在程序中设置状态保留时间,然后用全局group进行计算,但是过期状态没有清理导致状态也来越大,最终内存溢出,请问这是什么原因导致的 运行SQL select count(distinct id) as user_count,adjust_time from (select data.f13 as country_id,data.f1 as id,concat(DATE_FORMAT(FROM_UNIXTIME(data.f12/1000),'-MM-dd HH:mm'),':00') as adjust_time from userActionLog3) access_user_count where country_id='1' group by adjust_time 发送自 Windows 10 版邮件应用