??????????????????????????????????????????????????????



------------------ ???????? ------------------
??????:&nbsp;"zhiyezou"<[email protected]&gt;;
????????:&nbsp;2020??6??5??(??????) ????10:31
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;??????flink1.9 Sql ????????????????????????state??????



Hi


??????????????????????????????????????????????????????????????????????TTL




------------------&amp;nbsp;????????&amp;nbsp;------------------
??????:&amp;nbsp;"star"<[email protected]&amp;gt;;
????????:&amp;nbsp;2020??6??5??(??????) ????10:22
??????:&amp;nbsp;"[email protected]"<[email protected]&amp;gt;;

????:&amp;nbsp;flink1.9 Sql ????????????????????????state??????



??????&amp;amp;nbsp; &amp;amp;nbsp;????????flink 1.9??blink planner



???? ?? ?????? ????id&amp;amp;nbsp; ???????????????????????? 
????????????????hbase????monthtable&amp;amp;nbsp; 
&amp;amp;nbsp;rowkey??month+city




?????????????????? ????city???? ??????hbase????totalTable ??rowkey?? city






????????18?????? ????????restore??ck??????????????????&amp;amp;nbsp; 
????????cnt??????????????????totalTable??monthtable??????????????????????????????????????


????????????




val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, bsSettings)


myDataStream=......




tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id)




//??????????????????????id??????id??city??????????????????id??????????
val monthCount = tableEnv.sqlQuery(
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; s"""
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp;select month,city,count(distinct id) as cnt from 
monthtable&amp;amp;nbsp; group by month,city
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; """.stripMargin)


//??????????????????hbase??rokey??month+city
monthCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;gt;{
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val row=line._2
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val month=row.getField(0).toString
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val city=row.getField(1).toString
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val cnt=row.getField(2).toString
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val map=new 
util.HashMap[String,String]()
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; map.put("cnt",cnt)
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; (month+city,map)// 
month+city??rowkey&amp;amp;nbsp; cnt??????column
&amp;amp;nbsp; &amp;amp;nbsp; }).addSink(new MyHbaseSink("monthHbaseTable")






&amp;amp;nbsp;//?????????????????????? monthStat
&amp;amp;nbsp;tableEnv.registerTable("monthStat",monthCount)


&amp;amp;nbsp;//??????????id??????
&amp;amp;nbsp;val totalCount = tableEnv.sqlQuery(
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; s"""
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp;|select city,sum(cnt) as cityCnt from monthStat&amp;amp;nbsp; 
group by city
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; """.stripMargin)


//??????????????????hbase??rokey??city
&amp;amp;nbsp; &amp;amp;nbsp; 
totalCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;gt;{
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val row=line._2
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val city=row.getField(0).toString
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val 
totalCnt=row.getField(1).toString
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val map=new 
util.HashMap[String,String]()
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; map.put("totalCnt",totalCnt)
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; (city,map)
&amp;amp;nbsp; &amp;amp;nbsp; }).addSink("totalHbaseTable")

回复