????????????????????????????
------------------ ???????? ------------------ ??????: "star"<[email protected]>; ????????: 2020??6??5??(??????) ????10:40 ??????: "[email protected]"<[email protected]>; ????: ??????flink1.9 Sql ????????????????????????state?????? ?????????????????????????????????????????????????????? ------------------ ???????? ------------------ ??????: "zhiyezou"<[email protected]>; ????????: 2020??6??5??(??????) ????10:31 ??????: "user-zh"<[email protected]>; ????: ??????flink1.9 Sql ????????????????????????state?????? Hi ??????????????????????????????????????????????????????????????????????TTL ------------------&nbsp;????????&nbsp;------------------ ??????:&nbsp;"star"<[email protected]&gt;; ????????:&nbsp;2020??6??5??(??????) ????10:22 ??????:&nbsp;"[email protected]"<[email protected]&gt;; ????:flink1.9 Sql ????????????????????????state?????? ????????????????flink 1.9??blink planner ???? ?? ?????? ????id; ???????????????????????? ????????????????hbase????monthtable;rowkey??month+city ?????????????????? ????city???? ??????hbase????totalTable ??rowkey?? city ????????18?????? ????????restore??ck??????????????????&amp;nbsp; ????????cnt??????????????????totalTable??monthtable?????????????????????????????????????? ???????????? val env = StreamExecutionEnvironment.getExecutionEnvironment env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, bsSettings) myDataStream=...... tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id) //??????????????????????id??????id??city??????????????????id?????????? val monthCount = tableEnv.sqlQuery( s""" select month,city,count(distinct id) as cnt from monthtable group by month,city """.stripMargin) //??????????????????hbase??rokey??month+city monthCount.toRetractStream[Row].filter(_._1).map(line=&amp;gt;{ val row=line._2 val month=row.getField(0).toString val city=row.getField(1).toString val cnt=row.getField(2).toString val map=new util.HashMap[String,String]() map.put("cnt",cnt) (month+city,map) // month+city??rowkey cnt??????column }).addSink(new MyHbaseSink("monthHbaseTable") //?????????????????????? monthStat tableEnv.registerTable("monthStat",monthCount) //??????????id?????? val totalCount = tableEnv.sqlQuery( s""" select city,sum(cnt) as cityCnt from monthStat&amp;nbsp; group by city """.stripMargin) //??????????????????hbase??rokey??city totalCount.toRetractStream[Row].filter(_._1).map(line=&amp;gt;{ val row=line._2 val city=row.getField(0).toString val totalCnt=row.getField(1).toString val map=new util.HashMap[String,String]() map.put("totalCnt",totalCnt) (city,map) }).addSink("totalHbaseTable")
