????????????????????????????



------------------ ???????? ------------------
??????:&nbsp;"star"<[email protected]&gt;;
????????:&nbsp;2020??6??5??(??????) ????10:40
??????:&nbsp;"[email protected]"<[email protected]&gt;;

????:&nbsp;??????flink1.9 Sql ????????????????????????state??????



??????????????????????????????????????????????????????




------------------ ???????? ------------------
??????:&nbsp;"zhiyezou"<[email protected]&gt;;
????????:&nbsp;2020??6??5??(??????) ????10:31
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;??????flink1.9 Sql ????????????????????????state??????



Hi


??????????????????????????????????????????????????????????????????????TTL




------------------&amp;nbsp;????????&amp;nbsp;------------------
??????:&amp;nbsp;"star"<[email protected]&amp;gt;;
????????:&amp;nbsp;2020??6??5??(??????) ????10:22
??????:&amp;nbsp;"[email protected]"<[email protected]&amp;gt;;

????:flink1.9 Sql ????????????????????????state??????



????????????????flink 1.9??blink planner



???? ?? ?????? ????id; ???????????????????????? 
????????????????hbase????monthtable;rowkey??month+city




?????????????????? ????city???? ??????hbase????totalTable ??rowkey?? city






????????18?????? ????????restore??ck??????????????????&amp;amp;nbsp; 
????????cnt??????????????????totalTable??monthtable??????????????????????????????????????


????????????




val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, bsSettings)


myDataStream=......




tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id)




//??????????????????????id??????id??city??????????????????id??????????
val monthCount = tableEnv.sqlQuery(
&nbsp;s"""
select month,city,count(distinct id) as cnt from monthtable group by month,city
&nbsp;""".stripMargin)


//??????????????????hbase??rokey??month+city
monthCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;gt;{
val row=line._2
&nbsp;val month=row.getField(0).toString
val city=row.getField(1).toString
&nbsp;val cnt=row.getField(2).toString
val map=new util.HashMap[String,String]()
&nbsp;map.put("cnt",cnt)
&nbsp;(month+city,map)&nbsp; &nbsp; &nbsp;// month+city??rowkey cnt??????column
&nbsp;}).addSink(new MyHbaseSink("monthHbaseTable")






//?????????????????????? monthStat
tableEnv.registerTable("monthStat",monthCount)


//??????????id??????
val totalCount = tableEnv.sqlQuery(
s"""
select city,sum(cnt) as cityCnt from monthStat&amp;amp;nbsp; group by city
&nbsp;""".stripMargin)


//??????????????????hbase??rokey??city
&nbsp;totalCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;gt;{
val row=line._2
val city=row.getField(0).toString
val totalCnt=row.getField(1).toString
val map=new util.HashMap[String,String]()
map.put("totalCnt",totalCnt)
(city,map)
&nbsp;}).addSink("totalHbaseTable")

回复