回复:env.readFile 递归监控目录 如何清理状态(历史目录)

2020-06-18 文章 star
感谢您的建议!如果我把hdfs目录删掉,flink里对应的状态也会清掉吗?



发自我的iPhone


-- 原始邮件 --
发件人: Jark Wu https://issues.apache.org/jira/browse/FLINK-18357;
我的一个初步的想法是,是否可以有一个 inactive-interval 去标记一个子目录已经不会有新文件产生了,这样 checkpoint
就不用跟踪这个子目录下的所有文件。


Best,
Jark

On Wed, 17 Jun 2020 at 14:04, star <3149768...@qq.com wrote:

 nbsp;


 env.readFile(format,path, FileProcessingMode.PROCESS_CONTINUOUSLY, 6)


 上面是一个监控目录里的数据的source
 format设置成递归监控一个父目录A , A下面是日期目录 ,如:


 A/20200101/
 A/20200102/
 A/20200103/
 ...
 ...



 
随着时间的增加,比如到6月需要监控近200个目录,每个目录又有500个文件,每次ck需要同步的状态就是200*500个文件的消费的offset,这样经常ck超时,


 请问这种可以清理历史状态吗,比如业务上知乎有近7天的目录才会有数据更新,历史的就可以不监控了。

env.readFile ???????????? ????????????????????????

2020-06-17 文章 star



env.readFile(format,path, FileProcessingMode.PROCESS_CONTINUOUSLY, 6)


source
formatA , A?? :


A/20200101/
A/20200102/
A/20200103/
...
...


??6200500ck??200*500??offset??ck??


??7??

??????TTL ??????????????

2020-06-10 文章 star
??sink??
??
select count(distinct product) from 
order),??
2100w0??








----
??:"Yichao Yang"<1048262...@qq.com;
:2020??6??10??(??) 2:45
??:"user-zh"

TTL ??????????????

2020-06-10 文章 star
?? 
??0:00

?????? flink 1.9 ??????UDAF ????state??????????????

2020-06-07 文章 star
?? ??




----
??:"Benchao Li"

?????? flink 1.9 ??????UDAF ????state??????????????

2020-06-07 文章 star
udaf?? 
??arrayListarraylistarraylist 
checkpoint




----
??:"Benchao Li"

flink 1.9 ??????UDAF ????state??????????????

2020-06-07 文章 star
??


flink 1.9 ??UDAF state??


??sql stage??


class MyFunc extends AggregateFunction{
createAccumulator
accumulate
getValue
merge
 }

?????? flink1.9 Sql ????????????????????????state??????

2020-06-07 文章 star
??


restorerestore??


error


----
??:"Benchao Li"

??????flink1.9 Sql ????????????????????????state??????

2020-06-05 文章 star





----
??:"star"<3149768...@qq.com;
:2020??6??5??(??) 10:40
??:"user-zh@flink.apache.org"

??????flink1.9 Sql ????????????????????????state??????

2020-06-04 文章 star
??




----
??:"zhiyezou"<1530130...@qq.com;
:2020??6??5??(??) 10:31
??:"user-zh"

flink1.9 Sql ????????????????????????state??????

2020-06-04 文章 star
?? flink 1.9??blink planner



 ?? ?? id  
hbasemonthtable rowkey??month+city




?? city ??hbasetotalTable ??rowkey?? city






18?? restore??ck?? 
cnt??totalTable??monthtable??







val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, bsSettings)


myDataStream=..




tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id)




//??id??id??city??id??
val monthCount = tableEnv.sqlQuery(
   s"""
select month,city,count(distinct id) as cnt 
from monthtable group by month,city
   """.stripMargin)


//??hbase??rokey??month+city
monthCount.toRetractStream[Row].filter(_._1).map(line={
   val row=line._2
   val month=row.getField(0).toString
   val city=row.getField(1).toString
   val cnt=row.getField(2).toString
   val map=new util.HashMap[String,String]()
   map.put("cnt",cnt)
   (month+city,map)// month+city??rowkey cnt??column
  }).addSink(new MyHbaseSink("monthHbaseTable")






//?? monthStat
tableEnv.registerTable("monthStat",monthCount)


//??id??
val totalCount = tableEnv.sqlQuery(
   s"""
|select city,sum(cnt) as cityCnt from 
monthStat group by city
   """.stripMargin)


//??hbase??rokey??city
  totalCount.toRetractStream[Row].filter(_._1).map(line={
   val row=line._2
   val city=row.getField(0).toString
   val totalCnt=row.getField(1).toString
   val map=new util.HashMap[String,String]()
   map.put("totalCnt",totalCnt)
   (city,map)
  }).addSink("totalHbaseTable")

??????yarn-session??job????????????????????????

2020-06-04 文章 star
-n  1.10




----
??:""

?????? flink 1.9 ????????????????

2020-06-03 文章 star
append??state?? 
distinctappend??
append??




----
??:"LakeShen"https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=147427289#FLIP105:SupporttoInterpretChangeloginFlinkSQL(IntroducingDebeziumandCanalFormat)-CanalFormat


 1048262223 <1048262...@qq.comgt; ??2020??6??3?? 2:59??

 gt; Hi
 gt; Flink ??RetractStream
 gt;
 
sinkupdatekafkaupdatesink??kafka??RetractStream
 gt;
 gt;
 gt; Best,
 gt; Yichao Yang
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:amp;nbsp;"star"<3149768...@qq.comamp;gt;;
 gt; :amp;nbsp;2020??6??3??(??) 2:47
 gt; 
??:amp;nbsp;"user-zh@flink.apache.org"

?????? flink 1.9 ????????????????

2020-06-03 文章 star


??select year,month,day,province,sub_name,sum(amount),count(*) as cou from 
mytable group by year,month,day,province,sub_name;


?? ?? ??
kafka??topic??table


??table??






----
??:"godfrey he"https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=147427289#FLIP105:SupporttoInterpretChangeloginFlinkSQL(IntroducingDebeziumandCanalFormat)-CanalFormat


1048262223 <1048262...@qq.com ??2020??6??3?? 2:59??

 Hi
 Flink ??RetractStream
 
sinkupdatekafkaupdatesink??kafka??RetractStream


 Best,
 Yichao Yang


 --nbsp;nbsp;--
 ??:nbsp;"star"<3149768...@qq.comgt;;
 :nbsp;2020??6??3??(??) 2:47
 
??:nbsp;"user-zh@flink.apache.org"

flink 1.9 ????????????????

2020-06-03 文章 star



??toRetractStreamkafka??
??kafka??flink 
??RetractStream






?????? Flink1.9 yarn session???? ????Ask timed out

2020-06-02 文章 star
JM




----
??:"Benchao Li"

Flink1.9 yarn session???? ????Ask timed out

2020-06-01 文章 star
??

Flink1.9 yarn session Ask timed out


??flink list shell?? ??
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher#180439734]] after [1 ms]. Message of 
type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't send a 
reply.




Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Job cancellation timed 
out.]





web uiServer Response Message: Internal server error. 
 









 

回复:flink sql的json解析udf

2020-05-29 文章 star
谢谢



发自我的iPhone


-- 原始邮件 --
发件人: Benchao Li https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=141724550

star <3149768...@qq.com 于2020年5月29日周五 下午2:34写道:

 请问,flink sql1.10有解析json的udf吗?没有找到

 发自我的iPhone



-- 

Best,
Benchao Li

flink sql的json解析udf

2020-05-29 文章 star
请问,flink sql1.10有解析json的udf吗?没有找到

发自我的iPhone

回复:回复:全局state

2020-05-27 文章 star
感谢您的建议,但是这样数据会重复,翻倍。目前看好像只能依赖外部存储了



发自我的iPhone


-- 原始邮件 --
发件人: a773807...@gmail.com https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/api_concepts.html#define-keys-using-key-selector-functions

Best,
tison.


star <3149768...@qq.comgt; 于2020年5月26日周二 下午6:42写道:

gt; 
请问,有全局状态组件吗?我有一个需求需要对数据里的id和name做映射,也就是如果两条数据的id或者name相同则映射成一个值;现在只能使用operator
gt; state,并且并行度设置为1,来实现全局state
gt;
gt;
gt; 谢谢
gt;
gt; 发自我的iPhone

回复:全局state

2020-05-26 文章 star
感谢您的回复,通过keyby细节实现有点难.
id:1,name:A,value:A1
id:2,name:A,value:A2
id:1,name:B,value:A3


以上三条记录,第一条最先到达,后面两条到达后发现id或者name和第一条的id或者name一致value就取第一条的值。
输出:
id:1,name:A,value:A1
id:2,name:A,value:A1
id:1,name:B,value:A1


原来的想法是存一个map,map的key是id或者name,value是对应的value值;例如上面例子对应的mapstate对应的k,v:


key:1,value:A1
key:A,value:A1
key:2,value:A1
key:B,value:A1



发自我的iPhone


-- 原始邮件 --
发件人: tison https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/api_concepts.html#define-keys-using-key-selector-functions

Best,
tison.


star <3149768...@qq.com 于2020年5月26日周二 下午6:42写道:

 
请问,有全局状态组件吗?我有一个需求需要对数据里的id和name做映射,也就是如果两条数据的id或者name相同则映射成一个值;现在只能使用operator
 state,并且并行度设置为1,来实现全局state


 谢谢

 发自我的iPhone

全局state

2020-05-26 文章 star
请问,有全局状态组件吗?我有一个需求需要对数据里的id和name做映射,也就是如果两条数据的id或者name相同则映射成一个值;现在只能使用operator 
state,并且并行度设置为1,来实现全局state


谢谢

发自我的iPhone

??????????Async I/O??exactly-once

2019-09-03 文章 star
??io?? 
??at least once??The 
asynchronous I/O operator offers full exactly-once fault tolerance guarantees. 
It stores the records for in-flight asynchronous requests in checkpoints and 
restores/re-triggers the requests when recovering from a failure.

iPhone


--  --
??: Dino Zhang 

????Async I/O??exactly-once

2019-09-03 文章 star
??failover??at
 least-once??
ck1:??a b 
cck2d??e??f??ck1??checkpoint??a 
??b??c??.


ck2ecancel??c??d??ck1??c??d




iPhone