??????????????????????????????????????????????????????
------------------ ???????? ------------------ ??????: "zhiyezou"<[email protected]>; ????????: 2020??6??5??(??????) ????10:31 ??????: "user-zh"<[email protected]>; ????: ??????flink1.9 Sql ????????????????????????state?????? Hi ??????????????????????????????????????????????????????????????????????TTL ------------------&nbsp;????????&nbsp;------------------ ??????:&nbsp;"star"<[email protected]&gt;; ????????:&nbsp;2020??6??5??(??????) ????10:22 ??????:&nbsp;"[email protected]"<[email protected]&gt;; ????:&nbsp;flink1.9 Sql ????????????????????????state?????? ??????&amp;nbsp; &amp;nbsp;????????flink 1.9??blink planner ???? ?? ?????? ????id&amp;nbsp; ???????????????????????? ????????????????hbase????monthtable&amp;nbsp; &amp;nbsp;rowkey??month+city ?????????????????? ????city???? ??????hbase????totalTable ??rowkey?? city ????????18?????? ????????restore??ck??????????????????&amp;nbsp; ????????cnt??????????????????totalTable??monthtable?????????????????????????????????????? ???????????? val env = StreamExecutionEnvironment.getExecutionEnvironment env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, bsSettings) myDataStream=...... tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id) //??????????????????????id??????id??city??????????????????id?????????? val monthCount = tableEnv.sqlQuery( &amp;nbsp; &amp;nbsp; &amp;nbsp; s""" &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;select month,city,count(distinct id) as cnt from monthtable&amp;nbsp; group by month,city &amp;nbsp; &amp;nbsp; &amp;nbsp; """.stripMargin) //??????????????????hbase??rokey??month+city monthCount.toRetractStream[Row].filter(_._1).map(line=&amp;gt;{ &amp;nbsp; &amp;nbsp; &amp;nbsp; val row=line._2 &amp;nbsp; &amp;nbsp; &amp;nbsp; val month=row.getField(0).toString &amp;nbsp; &amp;nbsp; &amp;nbsp; val city=row.getField(1).toString &amp;nbsp; &amp;nbsp; &amp;nbsp; val cnt=row.getField(2).toString &amp;nbsp; &amp;nbsp; &amp;nbsp; val map=new util.HashMap[String,String]() &amp;nbsp; &amp;nbsp; &amp;nbsp; map.put("cnt",cnt) &amp;nbsp; &amp;nbsp; &amp;nbsp; (month+city,map)// month+city??rowkey&amp;nbsp; cnt??????column &amp;nbsp; &amp;nbsp; }).addSink(new MyHbaseSink("monthHbaseTable") &amp;nbsp;//?????????????????????? monthStat &amp;nbsp;tableEnv.registerTable("monthStat",monthCount) &amp;nbsp;//??????????id?????? &amp;nbsp;val totalCount = tableEnv.sqlQuery( &amp;nbsp; &amp;nbsp; &amp;nbsp; s""" &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;|select city,sum(cnt) as cityCnt from monthStat&amp;nbsp; group by city &amp;nbsp; &amp;nbsp; &amp;nbsp; """.stripMargin) //??????????????????hbase??rokey??city &amp;nbsp; &amp;nbsp; totalCount.toRetractStream[Row].filter(_._1).map(line=&amp;gt;{ &amp;nbsp; &amp;nbsp; &amp;nbsp; val row=line._2 &amp;nbsp; &amp;nbsp; &amp;nbsp; val city=row.getField(0).toString &amp;nbsp; &amp;nbsp; &amp;nbsp; val totalCnt=row.getField(1).toString &amp;nbsp; &amp;nbsp; &amp;nbsp; val map=new util.HashMap[String,String]() &amp;nbsp; &amp;nbsp; &amp;nbsp; map.put("totalCnt",totalCnt) &amp;nbsp; &amp;nbsp; &amp;nbsp; (city,map) &amp;nbsp; &amp;nbsp; }).addSink("totalHbaseTable")
