回复:env.readFile 递归监控目录 如何清理状态(历史目录)
感谢您的建议!如果我把hdfs目录删掉,flink里对应的状态也会清掉吗? 发自我的iPhone -- 原始邮件 -- 发件人: Jark Wu https://issues.apache.org/jira/browse/FLINK-18357; 我的一个初步的想法是,是否可以有一个 inactive-interval 去标记一个子目录已经不会有新文件产生了,这样 checkpoint 就不用跟踪这个子目录下的所有文件。 Best, Jark On Wed, 17 Jun 2020 at 14:04, star <3149768...@qq.com wrote: nbsp; env.readFile(format,path, FileProcessingMode.PROCESS_CONTINUOUSLY, 6) 上面是一个监控目录里的数据的source format设置成递归监控一个父目录A , A下面是日期目录 ,如: A/20200101/ A/20200102/ A/20200103/ ... ... 随着时间的增加,比如到6月需要监控近200个目录,每个目录又有500个文件,每次ck需要同步的状态就是200*500个文件的消费的offset,这样经常ck超时, 请问这种可以清理历史状态吗,比如业务上知乎有近7天的目录才会有数据更新,历史的就可以不监控了。
env.readFile ???????????? ????????????????????????
env.readFile(format,path, FileProcessingMode.PROCESS_CONTINUOUSLY, 6) source formatA , A?? : A/20200101/ A/20200102/ A/20200103/ ... ... ??6200500ck??200*500??offset??ck?? ??7??
??????TTL ??????????????
??sink?? ?? select count(distinct product) from order),?? 2100w0?? ---- ??:"Yichao Yang"<1048262...@qq.com; :2020??6??10??(??) 2:45 ??:"user-zh"
TTL ??????????????
?? ??0:00
?????? flink 1.9 ??????UDAF ????state??????????????
?? ?? ---- ??:"Benchao Li"
?????? flink 1.9 ??????UDAF ????state??????????????
udaf?? ??arrayListarraylistarraylist checkpoint ---- ??:"Benchao Li"
flink 1.9 ??????UDAF ????state??????????????
?? flink 1.9 ??UDAF state?? ??sql stage?? class MyFunc extends AggregateFunction{ createAccumulator accumulate getValue merge }
?????? flink1.9 Sql ????????????????????????state??????
?? restorerestore?? error ---- ??:"Benchao Li"
??????flink1.9 Sql ????????????????????????state??????
---- ??:"star"<3149768...@qq.com; :2020??6??5??(??) 10:40 ??:"user-zh@flink.apache.org"
??????flink1.9 Sql ????????????????????????state??????
?? ---- ??:"zhiyezou"<1530130...@qq.com; :2020??6??5??(??) 10:31 ??:"user-zh"
flink1.9 Sql ????????????????????????state??????
?? flink 1.9??blink planner ?? ?? id hbasemonthtable rowkey??month+city ?? city ??hbasetotalTable ??rowkey?? city 18?? restore??ck?? cnt??totalTable??monthtable?? val env = StreamExecutionEnvironment.getExecutionEnvironment env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, bsSettings) myDataStream=.. tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id) //??id??id??city??id?? val monthCount = tableEnv.sqlQuery( s""" select month,city,count(distinct id) as cnt from monthtable group by month,city """.stripMargin) //??hbase??rokey??month+city monthCount.toRetractStream[Row].filter(_._1).map(line={ val row=line._2 val month=row.getField(0).toString val city=row.getField(1).toString val cnt=row.getField(2).toString val map=new util.HashMap[String,String]() map.put("cnt",cnt) (month+city,map)// month+city??rowkey cnt??column }).addSink(new MyHbaseSink("monthHbaseTable") //?? monthStat tableEnv.registerTable("monthStat",monthCount) //??id?? val totalCount = tableEnv.sqlQuery( s""" |select city,sum(cnt) as cityCnt from monthStat group by city """.stripMargin) //??hbase??rokey??city totalCount.toRetractStream[Row].filter(_._1).map(line={ val row=line._2 val city=row.getField(0).toString val totalCnt=row.getField(1).toString val map=new util.HashMap[String,String]() map.put("totalCnt",totalCnt) (city,map) }).addSink("totalHbaseTable")
??????yarn-session??job????????????????????????
-n 1.10 ---- ??:""
?????? flink 1.9 ????????????????
append??state?? distinctappend?? append?? ---- ??:"LakeShen"https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=147427289#FLIP105:SupporttoInterpretChangeloginFlinkSQL(IntroducingDebeziumandCanalFormat)-CanalFormat 1048262223 <1048262...@qq.comgt; ??2020??6??3?? 2:59?? gt; Hi gt; Flink ??RetractStream gt; sinkupdatekafkaupdatesink??kafka??RetractStream gt; gt; gt; Best, gt; Yichao Yang gt; gt; gt; --amp;nbsp;amp;nbsp;-- gt; ??:amp;nbsp;"star"<3149768...@qq.comamp;gt;; gt; :amp;nbsp;2020??6??3??(??) 2:47 gt; ??:amp;nbsp;"user-zh@flink.apache.org"
?????? flink 1.9 ????????????????
??select year,month,day,province,sub_name,sum(amount),count(*) as cou from mytable group by year,month,day,province,sub_name; ?? ?? ?? kafka??topic??table ??table?? ---- ??:"godfrey he"https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=147427289#FLIP105:SupporttoInterpretChangeloginFlinkSQL(IntroducingDebeziumandCanalFormat)-CanalFormat 1048262223 <1048262...@qq.com ??2020??6??3?? 2:59?? Hi Flink ??RetractStream sinkupdatekafkaupdatesink??kafka??RetractStream Best, Yichao Yang --nbsp;nbsp;-- ??:nbsp;"star"<3149768...@qq.comgt;; :nbsp;2020??6??3??(??) 2:47 ??:nbsp;"user-zh@flink.apache.org"
flink 1.9 ????????????????
??toRetractStreamkafka?? ??kafka??flink ??RetractStream
?????? Flink1.9 yarn session???? ????Ask timed out
JM ---- ??:"Benchao Li"
Flink1.9 yarn session???? ????Ask timed out
?? Flink1.9 yarn session Ask timed out ??flink list shell?? ?? akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#180439734]] after [1 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Job cancellation timed out.] web uiServer Response Message: Internal server error.
回复:flink sql的json解析udf
谢谢 发自我的iPhone -- 原始邮件 -- 发件人: Benchao Li https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=141724550 star <3149768...@qq.com 于2020年5月29日周五 下午2:34写道: 请问,flink sql1.10有解析json的udf吗?没有找到 发自我的iPhone -- Best, Benchao Li
flink sql的json解析udf
请问,flink sql1.10有解析json的udf吗?没有找到 发自我的iPhone
回复:回复:全局state
感谢您的建议,但是这样数据会重复,翻倍。目前看好像只能依赖外部存储了 发自我的iPhone -- 原始邮件 -- 发件人: a773807...@gmail.com https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/api_concepts.html#define-keys-using-key-selector-functions Best, tison. star <3149768...@qq.comgt; 于2020年5月26日周二 下午6:42写道: gt; 请问,有全局状态组件吗?我有一个需求需要对数据里的id和name做映射,也就是如果两条数据的id或者name相同则映射成一个值;现在只能使用operator gt; state,并且并行度设置为1,来实现全局state gt; gt; gt; 谢谢 gt; gt; 发自我的iPhone
回复:全局state
感谢您的回复,通过keyby细节实现有点难. id:1,name:A,value:A1 id:2,name:A,value:A2 id:1,name:B,value:A3 以上三条记录,第一条最先到达,后面两条到达后发现id或者name和第一条的id或者name一致value就取第一条的值。 输出: id:1,name:A,value:A1 id:2,name:A,value:A1 id:1,name:B,value:A1 原来的想法是存一个map,map的key是id或者name,value是对应的value值;例如上面例子对应的mapstate对应的k,v: key:1,value:A1 key:A,value:A1 key:2,value:A1 key:B,value:A1 发自我的iPhone -- 原始邮件 -- 发件人: tison https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/api_concepts.html#define-keys-using-key-selector-functions Best, tison. star <3149768...@qq.com 于2020年5月26日周二 下午6:42写道: 请问,有全局状态组件吗?我有一个需求需要对数据里的id和name做映射,也就是如果两条数据的id或者name相同则映射成一个值;现在只能使用operator state,并且并行度设置为1,来实现全局state 谢谢 发自我的iPhone
全局state
请问,有全局状态组件吗?我有一个需求需要对数据里的id和name做映射,也就是如果两条数据的id或者name相同则映射成一个值;现在只能使用operator state,并且并行度设置为1,来实现全局state 谢谢 发自我的iPhone
??????????Async I/O??exactly-once
??io?? ??at least once??The asynchronous I/O operator offers full exactly-once fault tolerance guarantees. It stores the records for in-flight asynchronous requests in checkpoints and restores/re-triggers the requests when recovering from a failure. iPhone -- -- ??: Dino Zhang
????Async I/O??exactly-once
??failover??at least-once?? ck1:??a b cck2d??e??f??ck1??checkpoint??a ??b??c??. ck2ecancel??c??d??ck1??c??d iPhone