Re: 滑动窗口数据存储多份问题
Hi 现在的实现是这样的,每条数据会在每个窗口中存一份 Best, Congxian 张浩 <13669299...@163.com> 于2020年7月6日周一 下午12:49写道: > Hi,all! > 由于第一次咨询,我不确定上一份邮件大家是否收到。 > 想咨询下大家,为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide > 份? > > > | | > 张浩 > | > | > 13669299...@163.com > | > 签名由网易邮箱大师定制
?????? ??????FLINKSQL1.10????????????UV
sorry,group agg. tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7))??. ---- ??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time x <35907...@qq.com ??2020??7??6?? 11:15?? ??1.10.1??sinkwindow??count distinct??window??count distinct??windowgroupnbsp;DATE_FORMAT(rowtm, '-MM-dd') sql?? val rt_totaluv_view : Table = tabEnv.sqlQuery( """ SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT userkey) uv FROM source GROUP BY DATE_FORMAT(rowtm, '-MM-dd') """) tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) .filter( line =gt; line._1 == true ).map( line =gt; line._2 ) val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp ) tabEnv.sqlUpdate( s""" INSERT INTO mysql_totaluv SELECT _1,MAX(_2) FROM $totaluvTabTmp GROUP BY _1 """) --nbsp;nbsp;-- ??:nbsp;"Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942 x <35907...@qq.comgt; ??2020??7??3?? 4:34?? gt; checkpoint?? gt; gt; tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key gt; gt; gt; gt; gt; --amp;nbsp;amp;nbsp;-- gt; ??:amp;nbsp;"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html gt; amp;gt; amp;amp;gt; [2] gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; gt; https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; x <35907...@qq.comamp;amp;amp;gt; ??2020??6??17?? 11:14?? gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; ??0??UV??UV?? gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; CREATE VIEW uv_per_10min AS gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; SELECTamp;amp;amp;amp;nbsp; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;nbsp; gt; MAX(DATE_FORMAT(proctimeamp;amp;amp;amp;nbsp;, gt; amp;gt; '-MM-dd gt; amp;gt; amp;amp;gt; HH:mm:00'))amp;amp;amp;amp;nbsp;OVER w gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; AS time_str,amp;amp;amp;amp;nbsp; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; amp;amp;amp;amp;nbsp; COUNT(DISTINCT user_id) OVER gt; w AS uv gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; FROM user_behavior gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN gt; UNBOUNDED gt; amp;gt; PRECEDING AND gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; CURRENT ROW); gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; ?? gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') gt; amp;gt; ?? gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; PS??1.10??DDL??CREATE VIEW?? gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; -- Best, Benchao Li -- Best, Benchao Li
flink cep result DataStream no data print
code: val inpurtDS = streamTableEnv.toAppendStream[BehaviorInfo](behaviorTable)inpurtDS.print()val pattern = Pattern.begin[BehaviorInfo]("start") .where(_.clickCount 7)val patternStream = CEP.pattern(inpurtDS, pattern) val result: DataStream[BehaviorInfo] = patternStream.process( new PatternProcessFunction[BehaviorInfo, BehaviorInfo]() { override def processMatch( matchPattern: util.Map[String, util.List[BehaviorInfo]], ctx: PatternProcessFunction.Context, out: Collector[BehaviorInfo]): Unit = { try { println( s""" |matchPattern: $matchPattern |util.List[BehaviorInfo]: ${matchPattern.get("start")} |""".stripMargin) out.collect(matchPattern.get("start").get(0)) } catch { case exception: Exception = println(exception) } } }) result.print() ??inpurtDS.print()pattern??result.print()PatternProcessFunctionprocessMatch?? Thanks a lot!
Re: 求助:FLINKSQL1.10实时统计累计UV
我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。 这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time x <35907...@qq.com> 于2020年7月6日周一 上午11:15写道: > 版本是1.10.1,最后sink的时候确实是一个window里面做count > distinct操作。请问是只要计算过程中含有一个window里面做count > distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupDATE_FORMAT(rowtm, > '-MM-dd') 这个sql对应的状态很大。代码如下: > val rt_totaluv_view : Table = tabEnv.sqlQuery( > """ > SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd HH:mm:00')) > time_str,COUNT(DISTINCT userkey) uv > FROM source > GROUP BY DATE_FORMAT(rowtm, '-MM-dd') > """) > tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) > > val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) > .filter( line = line._1 == true ).map( line = line._2 ) > > val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp ) > > tabEnv.sqlUpdate( > s""" > INSERT INTO mysql_totaluv > SELECT _1,MAX(_2) > FROM $totaluvTabTmp > GROUP BY _1 > """) > --原始邮件-- > 发件人:"Benchao Li" 发送时间:2020年7月3日(星期五) 晚上9:47 > 收件人:"user-zh" > 主题:Re: 求助:FLINKSQL1.10实时统计累计UV > > > > 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, > 这个已经在1.11中修复了。 > > [1] https://issues.apache.org/jira/browse/FLINK-17942 > > x <35907...@qq.com 于2020年7月3日周五 下午4:34写道: > > 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, > > > 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。 > > > > > --nbsp;原始邮件nbsp;-- > 发件人:nbsp;"Jark Wu" 发送时间:nbsp;2020年6月18日(星期四) 中午12:16 > 收件人:nbsp;"user-zh" > 主题:nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > > > > 是的,我觉得这样子是能绕过的。 > > On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.comgt; wrote: > > gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? > gt; val resTmpTab: Table = tabEnv.sqlQuery( > gt;nbsp;nbsp; """ > gt;nbsp;nbsp;nbsp;nbsp; SELECT > MAX(DATE_FORMAT(ts, '-MM-dd > HH:mm:00')) > gt; time_str,COUNT(DISTINCT userkey) uv > gt;nbsp;nbsp;nbsp;nbsp; FROM > user_behaviornbsp;nbsp;nbsp; GROUP BY > DATE_FORMAT(ts, '-MM-dd')nbsp;nbsp;nbsp; """) > gt; > gt; val > resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) > gt;nbsp;nbsp; > .filter(line=amp;gt;line._1==true).map(line=amp;gt;line._2) > gt; > gt; val res= tabEnv.fromDataStream(resTmpStream) > gt; tabEnv.sqlUpdate( > gt;nbsp;nbsp; s""" > gt;nbsp;nbsp;nbsp;nbsp; INSERT INTO > rt_totaluv > gt;nbsp;nbsp;nbsp;nbsp; SELECT _1,MAX(_2) > gt;nbsp;nbsp;nbsp;nbsp; FROM $res > gt;nbsp;nbsp;nbsp;nbsp; GROUP BY _1 > gt;nbsp;nbsp;nbsp;nbsp; """) > gt; > gt; > gt; > --amp;nbsp;原始邮件amp;nbsp;-- > gt; 发件人:amp;nbsp;"Jark Wu" gt; 发送时间:amp;nbsp;2020年6月17日(星期三) 中午1:55 > gt; 收件人:amp;nbsp;"user-zh" amp;gt;; > gt; > gt; 主题:amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > gt; > gt; > gt; > gt; 在 Flink 1.11 中,你可以尝试这样: > gt; > gt; CREATE TABLE mysql ( > gt; amp;nbsp;amp;nbsp; time_str STRING, > gt; amp;nbsp;amp;nbsp; uv BIGINT, > gt; amp;nbsp;amp;nbsp; PRIMARY KEY (ts) NOT ENFORCED > gt; ) WITH ( > gt; amp;nbsp;amp;nbsp; 'connector' = 'jdbc', > gt; amp;nbsp;amp;nbsp; 'url' = > 'jdbc:mysql://localhost:3306/mydatabase', > gt; amp;nbsp;amp;nbsp; 'table-name' = 'myuv' > gt; ); > gt; > gt; INSERT INTO mysql > gt; SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')), > COUNT(DISTINCTamp;nbsp; > gt; user_id) > gt; FROM user_behavior; > gt; > gt; On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.comamp;gt; > wrote: > gt; > gt; amp;gt; 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV, > gt; amp;gt; sink表这个样式 > gt; amp;gt; tm uv > gt; amp;gt; 2020/06/17 13:46:00 1 > gt; amp;gt; 2020/06/17 13:47:00 2 > gt; amp;gt; 2020/06/17 13:48:00 3 > gt; amp;gt; > gt; amp;gt; > gt; amp;gt; group by 日期的话,分钟如何获取 > gt; amp;gt; > gt; amp;gt; > gt; amp;gt; > > --amp;amp;nbsp;原始邮件amp;amp;nbsp;-- > gt; amp;gt; 发件人:amp;amp;nbsp;"Benchao Li"< > libenc...@apache.org > amp;amp;gt;; > gt; amp;gt; 发送时间:amp;amp;nbsp;2020年6月17日(星期三) 中午11:46 > gt; amp;gt; 收件人:amp;amp;nbsp;"user-zh"< > user-zh@flink.apache.org > amp;amp;gt;; > gt; amp;gt; > gt; amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > gt; amp;gt; > gt; amp;gt; > gt; amp;gt; > gt; amp;gt; Hi, > gt; amp;gt; 我感觉这种场景可以有两种方式, > gt; amp;gt; 1. 可以直接用group by + mini batch > gt; amp;gt; 2. window聚合 + fast emit > gt; amp;gt; > gt; amp;gt; 对于#1,group > by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, > '-MM-dd')。 > gt; amp;gt; 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] > 。同时,mini > batch的开启也需要 > gt; amp;gt; 用参数[2] 来打开。 > gt; amp;gt; > gt; amp;gt; 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 > gt; amp;gt; fast > emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: > gt; amp;gt; table.exec.emit.early-fire.enabled = true > gt; amp;gt; table.exec.emit.early-fire.delay = 60 s > gt; amp;gt;
滑动窗口数据存储多份问题
Hi,all! 由于第一次咨询,我不确定上一份邮件大家是否收到。 想咨询下大家,为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide 份? | | 张浩 | | 13669299...@163.com | 签名由网易邮箱大师定制
回复:【Flink的shuffle mode】
你好,可以参考下ExecutionConfigOptions,OptimizerConfigOptions和GlobalConfiguration,里面有比较清楚地介绍 -- 发件人:忝忝向仧 <153488...@qq.com> 发送时间:2020年7月6日(星期一) 12:16 收件人:user-zh 主 题:回复:【Flink的shuffle mode】 那就是说datasream默认模式就是pipeline,而批模式是batch,批的模式是存在shuffle情况下,需要等shuffle操作造成,才能发送到下游.那如果批应用有shuffle操作和没有shuffle的,是都要等这个shuffle操作完成了才能一起发给下游,还是说其他非shuffle操作完成了可以先发给下游,不用等shuffle操作完成一起再发送? 发自我的iPhone -- 原始邮件 -- 发件人: Jingsong Li
Re: 【Flink的shuffle mode】
pipeline:直接走网络传输,不buffer所有数据 batch:buffer所有数据,结束后一起发送 流一定是pipeline 批可以是pipeline(更好的性能),也可以是batch(更好的容错和更简单的资源申请) Best, Jingsong On Mon, Jul 6, 2020 at 12:16 PM 忝忝向仧 <153488...@qq.com> wrote: > > 那就是说datasream默认模式就是pipeline,而批模式是batch,批的模式是存在shuffle情况下,需要等shuffle操作造成,才能发送到下游.那如果批应用有shuffle操作和没有shuffle的,是都要等这个shuffle操作完成了才能一起发给下游,还是说其他非shuffle操作完成了可以先发给下游,不用等shuffle操作完成一起再发送? > > > > 发自我的iPhone > > > -- 原始邮件 -- > 发件人: Jingsong Li 发送时间: 2020年7月6日 11:03 > 收件人: user-zh 主题: 回复:【Flink的shuffle mode】 > > > > Hi, > > 现在就两种:pipeline和batch > > batch的话是block住,直到执行完毕才发给下游的,所以这个shuffle mode一般只对批作业有用。 > > 理论上可以per transformation的来设置,see PartitionTransformation. > > Best, > Jingsong > > On Sun, Jul 5, 2020 at 10:48 PM 忝忝向仧 <153488...@qq.com wrote: > > Hi,all: > > > 看Flink源码时候,在应用中使用keyBy后,源码的transformations会有shuffle mode方法,这个shuffle > mode看是UNDEFINED的。 > 那么,shuffle mode有哪些方式?在应用里面可以设置么? > > > 谢谢. > > > > -- > Best, Jingsong Lee -- Best, Jingsong Lee
回复:【Flink的shuffle mode】
那就是说datasream默认模式就是pipeline,而批模式是batch,批的模式是存在shuffle情况下,需要等shuffle操作造成,才能发送到下游.那如果批应用有shuffle操作和没有shuffle的,是都要等这个shuffle操作完成了才能一起发给下游,还是说其他非shuffle操作完成了可以先发给下游,不用等shuffle操作完成一起再发送? 发自我的iPhone -- 原始邮件 -- 发件人: Jingsong Li
flink sql??????????????????
??sql?? select day, count(id), sum(v1) from ( select day , id , sum(v1) v1 from source group by day, id )t group by day tConfig.setIdleStateRetentionTime(Time.minutes(1440),Time.minutes(1450)) ??id??14??checkpoint ??1.10.0
flink1.10 定义表时,把json数组声明成STRING类型的,查询出来是空
Hi, 可以通过以下步骤还原车祸现场: kafka topic: test_action kafka message: {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } 代码Problem2.java: package com.flink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; /** * * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, * 那么在eval方法接收到的就是Row[], * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL * * 现在思路:就是在定义表的时候,把ARRYA看成STRING, * 现在的问题,就是查询出来,都是空 * * kafka topic: test_action * * kafka message: * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } */ public class Problem2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings); bsEnv.registerFunction("explode3", new ExplodeFunction()); String ddlSource = "CREATE TABLE actionTable3 (\n" + "action STRING\n" + ") WITH (\n" + "'connector.type' = 'kafka',\n" + "'connector.version' = '0.11',\n" + "'connector.topic' = 'test_action',\n" + "'connector.startup-mode' = 'earliest-offset',\n" + "'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + "'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + "'update-mode' = 'append',\n" + "'format.type' = 'json',\n" + "'format.derive-schema' = 'false',\n" + "'format.json-schema' = '{\"type\": \"object\", \"properties\": {\"action\": {\"type\": \"string\"} } }'" + ")"; System.out.println(ddlSource); bsEnv.sqlUpdate(ddlSource); Table table = bsEnv.sqlQuery("select * from actionTable3"); //Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL TABLE(explode3(`action`)) as T(`word`)"); table.printSchema(); bsEnv.toAppendStream(table, Row.class) .print();// 输出都是空 bsEnv.execute("ARRAY tableFunction Problem"); } }
?????? ??????FLINKSQL1.10????????????UV
??1.10.1??sinkwindow??count distinct??window??count distinct??windowgroupDATE_FORMAT(rowtm, '-MM-dd') sql?? val rt_totaluv_view : Table = tabEnv.sqlQuery( """ SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT userkey) uv FROM source GROUP BY DATE_FORMAT(rowtm, '-MM-dd') """) tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) .filter( line = line._1 == true ).map( line = line._2 ) val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp ) tabEnv.sqlUpdate( s""" INSERT INTO mysql_totaluv SELECT _1,MAX(_2) FROM $totaluvTabTmp GROUP BY _1 """) ---- ??:"Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942 x <35907...@qq.com ??2020??7??3?? 4:34?? checkpoint?? tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key --nbsp;nbsp;-- ??:nbsp;"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html gt; amp;gt; [2] gt; amp;gt; gt; amp;gt; gt; https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html gt; amp;gt; gt; amp;gt; x <35907...@qq.comamp;amp;gt; ??2020??6??17?? 11:14?? gt; amp;gt; gt; amp;gt; amp;amp;gt; ??0??UV??UV?? gt; amp;gt; amp;amp;gt; CREATE VIEW uv_per_10min AS gt; amp;gt; amp;amp;gt; SELECTamp;amp;amp;nbsp; gt; amp;gt; amp;amp;gt; amp;amp;amp;nbsp; MAX(DATE_FORMAT(proctimeamp;amp;amp;nbsp;, gt; '-MM-dd gt; amp;gt; HH:mm:00'))amp;amp;amp;nbsp;OVER w gt; amp;gt; amp;amp;gt; AS time_str,amp;amp;amp;nbsp; gt; amp;gt; amp;amp;gt; amp;amp;amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv gt; amp;gt; amp;amp;gt; FROM user_behavior gt; amp;gt; amp;amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED gt; PRECEDING AND gt; amp;gt; amp;amp;gt; CURRENT ROW); gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; ?? gt; amp;gt; amp;amp;gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') gt; ?? gt; amp;gt; amp;amp;gt; PS??1.10??DDL??CREATE VIEW?? gt; amp;gt; amp;amp;gt; -- Best, Benchao Li
Re: 【Flink Join内存问题】
regular join确实是这样,所以量大的话可以用interval join 、temporal join > 2020年7月5日 下午3:50,忝忝向仧 <153488...@qq.com> 写道: > > Hi,all: > > 我看源码里写到JoinedStreams: > 也就是说join时候都是走内存计算的,那么如果某个stream的key值过多,会导致oom > 那么有什么预防措施呢? > 将key值多的一边进行打散? > > > Right now, the join is being evaluated in memory so you need to ensure that > the number > * of elements per key does not get too high. Otherwise the JVM might crash.
Re: 【Flink的shuffle mode】
Hi, 现在就两种:pipeline和batch batch的话是block住,直到执行完毕才发给下游的,所以这个shuffle mode一般只对批作业有用。 理论上可以per transformation的来设置,see PartitionTransformation. Best, Jingsong On Sun, Jul 5, 2020 at 10:48 PM 忝忝向仧 <153488...@qq.com> wrote: > Hi,all: > > > 看Flink源码时候,在应用中使用keyBy后,源码的transformations会有shuffle mode方法,这个shuffle > mode看是UNDEFINED的。 > 那么,shuffle mode有哪些方式?在应用里面可以设置么? > > > 谢谢. -- Best, Jingsong Lee
flink 1.11 作业执行异常
Hi, 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常: org.apache.flink.table.api.TableExecution: Failed to execute sql caused by : java.lang.IlleagalStateException: No ExecutorFactory found to execute the application. at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84) 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
Re: flink1.10在通过TableFunction实现行转列时,Row一直是空
Hi, 我现在转换思路,就是在定义表的时候,把ARRYA看成STRING, 那么,现在的问题,就是查询出来,都是空。 基于上面的代码环境,新写了一个类 Problem2.java package com.flink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; /** * * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, * 那么在eval方法接收到的就是Row[], * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL * * 现在思路:就是在定义表的时候,把ARRYA看成STRING, * 现在的问题,就是查询出来,都是空 * * kafka topic: test_action * * kafka message: * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } */ public class Problem2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings); bsEnv.registerFunction("explode3", new ExplodeFunction()); String ddlSource = "CREATE TABLE actionTable3 (\n" + "action STRING\n" + ") WITH (\n" + "'connector.type' = 'kafka',\n" + "'connector.version' = '0.11',\n" + "'connector.topic' = 'test_action',\n" + "'connector.startup-mode' = 'earliest-offset',\n" + "'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + "'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + "'update-mode' = 'append',\n" + "'format.type' = 'json',\n" + "'format.derive-schema' = 'false',\n" + "'format.json-schema' = '{\"type\": \"object\", \"properties\": {\"action\": {\"type\": \"string\"} } }'" + ")"; System.out.println(ddlSource); bsEnv.sqlUpdate(ddlSource); Table table = bsEnv.sqlQuery("select * from actionTable3"); //Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL TABLE(explode3(`action`)) as T(`word`)"); table.printSchema(); bsEnv.toAppendStream(table, Row.class) .print();// 输出都是空 bsEnv.execute("ARRAY tableFunction Problem"); } } Jark Wu 于2020年7月6日周一 上午10:36写道: > Hi, > > 当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。 > https://issues.apache.org/jira/browse/FLINK-17855 > > > Best, > Jark > > On Mon, 6 Jul 2020 at 10:19, Jim Chen wrote: > > > 大家好: > > 我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。 > > 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, > > 那么在eval方法接收到的就是Row[], > > 问题出在,Row[]中的数据获取不到,里面的元素都是NULL > > > > 通过下面的步骤和代码可还原车祸场景: > > kafka topic: test_action > > kafka message: > > {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": > > "id002", "actionName": "bbb"} ] } > > > > 代码1:Problem.java > > package com.flink; > > > > import > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > import org.apache.flink.table.api.EnvironmentSettings; > > import org.apache.flink.table.api.Table; > > import org.apache.flink.table.api.java.StreamTableEnvironment; > > import org.apache.flink.types.Row; > > > > /** > > * > > * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, > > * 那么在eval方法接收到的就是Row[], > > * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL > > * > > * kafka topic: test_action > > * > > * kafka message: > > * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": > > "id002", "actionName": "bbb"} ] } > > */ > > public class Problem { > > > > public static void main(String[] args) throws Exception { > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > EnvironmentSettings envSettings = > EnvironmentSettings.newInstance() > > .useBlinkPlanner() > > .inStreamingMode() > > .build(); > > StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, > > envSettings); > > bsEnv.registerFunction("explode2", new ExplodeFunction()); > > > > String ddlSource = "CREATE TABLE actionTable (\n" + > > "action ARRAY<\n" + > > " ROW<" + > > " actionID STRING,\n" + > > " actionName STRING\n" + > > " >\n" + > > " >\n" + > > ") WITH (\n" + > > "'connector.type' = 'kafka',\n" + > > "'connector.version' = '0.11',\n" + > > "'connector.topic' = 'test_action',\n" + > > "'connector.startup-mode' = 'earliest-offset',\n"
Re: flink1.10在通过TableFunction实现行转列时,Row一直是空
Hi, 当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。 https://issues.apache.org/jira/browse/FLINK-17855 Best, Jark On Mon, 6 Jul 2020 at 10:19, Jim Chen wrote: > 大家好: > 我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。 > 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, > 那么在eval方法接收到的就是Row[], > 问题出在,Row[]中的数据获取不到,里面的元素都是NULL > > 通过下面的步骤和代码可还原车祸场景: > kafka topic: test_action > kafka message: > {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": > "id002", "actionName": "bbb"} ] } > > 代码1:Problem.java > package com.flink; > > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > > /** > * > * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, > * 那么在eval方法接收到的就是Row[], > * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL > * > * kafka topic: test_action > * > * kafka message: > * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": > "id002", "actionName": "bbb"} ] } > */ > public class Problem { > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, > envSettings); > bsEnv.registerFunction("explode2", new ExplodeFunction()); > > String ddlSource = "CREATE TABLE actionTable (\n" + > "action ARRAY<\n" + > " ROW<" + > " actionID STRING,\n" + > " actionName STRING\n" + > " >\n" + > " >\n" + > ") WITH (\n" + > "'connector.type' = 'kafka',\n" + > "'connector.version' = '0.11',\n" + > "'connector.topic' = 'test_action',\n" + > "'connector.startup-mode' = 'earliest-offset',\n" + > "'connector.properties.zookeeper.connect' = > 'localhost:2181',\n" + > "'connector.properties.bootstrap.servers' = > 'localhost:9092',\n" + > "'update-mode' = 'append',\n" + > "'format.type' = 'json'\n" + > ")"; > bsEnv.sqlUpdate(ddlSource); > > //Table table = bsEnv.sqlQuery("select `action` from actionTable"); > Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL > TABLE(explode2(`action`)) as T(`word`)"); > table.printSchema(); > bsEnv.toAppendStream(table, Row.class) > .print("==tb=="); > > > bsEnv.execute("ARRAY tableFunction Problem"); > } > } > > 代码2:ExplodeFunction.java > package com.flink; > > import org.apache.flink.table.functions.TableFunction; > import org.apache.flink.types.Row; > > import java.util.ArrayList; > import java.util.Arrays; > > public class ExplodeFunction extends TableFunction { > > public void eval(Row[] values) { > System.out.println(values.length); > if (values.length > 0) { > for (Row row : values) { > if (row != null) {// 这里debug出来的row总是空 > ArrayList list = new ArrayList<>(); > for (int i = 0; i < row.getArity(); i++) { > Object field = row.getField(i); > list.add(field); > } > > collector.collect(Row.of(Arrays.toString(list.toArray(; > } > } > } > } > } > > 最后贴个debug的图 > [image: image.png] >
flink1.10在通过TableFunction实现行转列时,Row一直是空
大家好: 我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, 那么在eval方法接收到的就是Row[], 问题出在,Row[]中的数据获取不到,里面的元素都是NULL 通过下面的步骤和代码可还原车祸场景: kafka topic: test_action kafka message: {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } 代码1:Problem.java package com.flink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; /** * * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, * 那么在eval方法接收到的就是Row[], * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL * * kafka topic: test_action * * kafka message: * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } */ public class Problem { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings); bsEnv.registerFunction("explode2", new ExplodeFunction()); String ddlSource = "CREATE TABLE actionTable (\n" + "action ARRAY<\n" + " ROW<" + " actionID STRING,\n" + " actionName STRING\n" + " >\n" + " >\n" + ") WITH (\n" + "'connector.type' = 'kafka',\n" + "'connector.version' = '0.11',\n" + "'connector.topic' = 'test_action',\n" + "'connector.startup-mode' = 'earliest-offset',\n" + "'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + "'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + "'update-mode' = 'append',\n" + "'format.type' = 'json'\n" + ")"; bsEnv.sqlUpdate(ddlSource); //Table table = bsEnv.sqlQuery("select `action` from actionTable"); Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL TABLE(explode2(`action`)) as T(`word`)"); table.printSchema(); bsEnv.toAppendStream(table, Row.class) .print("==tb=="); bsEnv.execute("ARRAY tableFunction Problem"); } } 代码2:ExplodeFunction.java package com.flink; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; import java.util.ArrayList; import java.util.Arrays; public class ExplodeFunction extends TableFunction { public void eval(Row[] values) { System.out.println(values.length); if (values.length > 0) { for (Row row : values) { if (row != null) {// 这里debug出来的row总是空 ArrayList list = new ArrayList<>(); for (int i = 0; i < row.getArity(); i++) { Object field = row.getField(i); list.add(field); } collector.collect(Row.of(Arrays.toString(list.toArray(; } } } } } 最后贴个debug的图 [image: image.png]
Re: 在一个 flink operator 中怎样根据消息内容动态生成多个监控指标?
Mark一下,我现在也有这样的需求 > 2020年7月4日 下午12:35,zhisheng 写道: > > 我猜你是想要将 table name 作为一个标签方便后期分组查询过滤? > > wangl...@geekplus.com.cn 于2020年7月3日周五 上午10:24写道: > >> public void invoke(ObjectNode node, Context context) throws Exception { >> >>String tableName = node.get("metadata").get("topic").asText(); >>Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, >> new MeterView(10)); >>meter.markEvent(); >>log.info("### counter: " + meter.toString() + "\t" + >> meter.getCount()); >> >> 如上面代码所示,在 invoke 方法中解析得到 tableName, 以 tableName 名字作为 metrics. >> 但这样写每一消息下来了后相当于重新定义了 这个 metrics , 又从 0 开始计数了。 >> >> 谢谢, >> 王磊 >> >> >> wangl...@geekplus.com.cn >> >> >> Sender: kcz >> Send Time: 2020-07-03 09:13 >> Receiver: wanglei2 >> Subject: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? >> 按照你的描述 你就是少了tablename,那么你解析log 得到了tablename又做metric就好了吧 >> >> >> >> -- 原始邮件 -- >> 发件人: 王磊2 >> 发送时间: 2020年7月2日 21:46 >> 收件人: user-zh , 17610775726 <17610775...@163.com> >> 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? >> >> >> 没有明白你说的实现方式。 >> >> 我最终要得到类似的 Metrics: myCounter_table1, myCounter_table2, ..., >> myCounter_tableX >> 但我看代码中 Metrics 的初始化都是在 open 方法中的,在这个方法中我没法得到 tableName 是什么。 >> >> 谢谢, >> 王磊 >> >> >> >> -- >> 发件人:JasonLee <17610775...@163.com> >> 发送时间:2020年7月2日(星期四) 21:12 >> 收件人:user-zh >> 主 题:回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? >> >> 你把tablename传到下面metric里不就行了吗 >> >> >> | | >> JasonLee >> | >> | >> 邮箱:17610775...@163.com >> | >> >> Signature is customized by Netease Mail Master >> >> 在2020年07月02日 16:39,wangl...@geekplus.com.cn 写道: >> >> 全都是同一种类型的 metrics. >> 比如消息中是 mysql binlog 解析结果,我想要根据消息内容拿到 tableName, 按 tableName 生成不同名称的 >> metrics(但都是 meter 类型) >> >> 谢谢, >> 王磊 >> >> >> >> >> wangl...@geekplus.com.cn >> >> >> 发件人: JasonLee >> 发送时间: 2020-07-02 16:16 >> 收件人: user-zh >> 主题: 回复:在一个 flink operator 中怎样根据消息内容动态生成多个监控指标? >> 是要生成不同类型的metric吗 比如counter meter ? >> >> >> | | >> JasonLee >> | >> | >> 邮箱:17610775...@163.com >> | >> >> Signature is customized by Netease Mail Master >> >> 在2020年07月02日 15:34,wangl...@geekplus.com.cn 写道: >> >> 官网上的例子: >> >> public class MyMapper extends RichMapFunction { >> private transient Counter counter; >> @Override >> public void open(Configuration config) { >> this.counter = getRuntimeContext() >>.getMetricGroup() >>.counter("myCounter"); >> } >> @Override >> public String map(String value) throws Exception { >> this.counter.inc(); >> return value; >> } >> } >> >> 我想要根据 map 方法中传入的参数生成不同的 监控指标,怎样可以实现呢? >> >> 谢谢, >> 王磊 >> >> >> >> wangl...@geekplus.com.cn >> >> >>
答复: 回复:flink1.10 checkpoint 运行一段时间空指针异常
Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。 我之前提了个jira 描述了这个问题 https://issues.apache.org/jira/browse/FLINK-18196 修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk 版本,可以参考下面的patch: https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19 -邮件原件- 发件人: zhisheng 发送时间: 2020年7月5日 15:01 收件人: user-zh 主题: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常 生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian Best! zhisheng Congxian Qiu 于2020年7月4日周六 下午3:21写道: > @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢? > > Best, > Congxian > > > zhisheng 于2020年7月4日周六 下午12:27写道: > > > 我们也有遇到过这个异常,但是不是很常见 > > > > Congxian Qiu 于2020年7月3日周五 下午2:08写道: > > > > > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试 > > > [1] https://issues.apache.org/jira/browse/FLINK-17479 > > > Best, > > > Congxian > > > > > > > > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空 > > > > > > > > > > > > > > > > > > > > > > > > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道: > > > > >你到具体的tm上找到相关的operator看看是不是有异常信息 > > > > > > > > > > > > > > >| | > > > > >JasonLee > > > > >| > > > > >| > > > > >邮箱:17610775...@163.com > > > > >| > > > > > > > > > >Signature is customized by Netease Mail Master > > > > > > > > > >在2020年07月01日 20:43,程龙 写道: > > > > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下: > > > > > > > > > > > > > > >java.lang.Exception: Could not perform checkpoint 3201 for operator > > > > Filter -> Map (2/8). > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816) > > > > > at org.apache.flink.streaming.runtime.io > > > > > > > > > > .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86) > > > > > at org.apache.flink.streaming.runtime.io > > > > > > > > > > .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) > > > > > at org.apache.flink.streaming.runtime.io > > > > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) > > > > > at org.apache.flink.streaming.runtime.io > > > > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133) > > > > > at org.apache.flink.streaming.runtime.io > > > > > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469) > > > > > at > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > > > > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > > > > > at java.lang.Thread.run(Thread.java:745) > > > > >Caused by: java.lang.NullPointerException > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:803) > > > > > > > > > >
??Flink??shuffle mode??
Hi,all: ??Flink??keyBy??transformationsshuffle mode??shuffle modeUNDEFINED ??shuffle mode .
??Flink Join??????????
Hi,all: ??JoinedStreams: join??stream??key??oom ? ??key Right now, the join is being evaluated in memory so you need to ensure that the number * of elements per key does not get too high. Otherwise the JVM might crash.
Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常
生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian Best! zhisheng Congxian Qiu 于2020年7月4日周六 下午3:21写道: > @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢? > > Best, > Congxian > > > zhisheng 于2020年7月4日周六 下午12:27写道: > > > 我们也有遇到过这个异常,但是不是很常见 > > > > Congxian Qiu 于2020年7月3日周五 下午2:08写道: > > > > > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试 > > > [1] https://issues.apache.org/jira/browse/FLINK-17479 > > > Best, > > > Congxian > > > > > > > > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空 > > > > > > > > > > > > > > > > > > > > > > > > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道: > > > > >你到具体的tm上找到相关的operator看看是不是有异常信息 > > > > > > > > > > > > > > >| | > > > > >JasonLee > > > > >| > > > > >| > > > > >邮箱:17610775...@163.com > > > > >| > > > > > > > > > >Signature is customized by Netease Mail Master > > > > > > > > > >在2020年07月01日 20:43,程龙 写道: > > > > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下: > > > > > > > > > > > > > > >java.lang.Exception: Could not perform checkpoint 3201 for operator > > > > Filter -> Map (2/8). > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816) > > > > > at org.apache.flink.streaming.runtime.io > > > > > > > > > > .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86) > > > > > at org.apache.flink.streaming.runtime.io > > > > > > > > > > .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) > > > > > at org.apache.flink.streaming.runtime.io > > > > .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) > > > > > at org.apache.flink.streaming.runtime.io > > > > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133) > > > > > at org.apache.flink.streaming.runtime.io > > > > > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469) > > > > > at > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > > > > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > > > > > at java.lang.Thread.run(Thread.java:745) > > > > >Caused by: java.lang.NullPointerException > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1382) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843) > > > > > at > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:803) > > > > > > > > > >