Hi
??????????????????????????????????????????????????????????????????????TTL ------------------ ???????? ------------------ ??????: "star"<[email protected]>; ????????: 2020??6??5??(??????) ????10:22 ??????: "[email protected]"<[email protected]>; ????: flink1.9 Sql ????????????????????????state?????? ??????&nbsp; &nbsp;????????flink 1.9??blink planner ???? ?? ?????? ????id&nbsp; ???????????????????????? ????????????????hbase????monthtable&nbsp; &nbsp;rowkey??month+city ?????????????????? ????city???? ??????hbase????totalTable ??rowkey?? city ????????18?????? ????????restore??ck??????????????????&nbsp; ????????cnt??????????????????totalTable??monthtable?????????????????????????????????????? ???????????? val env = StreamExecutionEnvironment.getExecutionEnvironment env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, bsSettings) myDataStream=...... tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id) //??????????????????????id??????id??city??????????????????id?????????? val monthCount = tableEnv.sqlQuery( &nbsp; &nbsp; &nbsp; s""" &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;select month,city,count(distinct id) as cnt from monthtable&nbsp; group by month,city &nbsp; &nbsp; &nbsp; """.stripMargin) //??????????????????hbase??rokey??month+city monthCount.toRetractStream[Row].filter(_._1).map(line=&gt;{ &nbsp; &nbsp; &nbsp; val row=line._2 &nbsp; &nbsp; &nbsp; val month=row.getField(0).toString &nbsp; &nbsp; &nbsp; val city=row.getField(1).toString &nbsp; &nbsp; &nbsp; val cnt=row.getField(2).toString &nbsp; &nbsp; &nbsp; val map=new util.HashMap[String,String]() &nbsp; &nbsp; &nbsp; map.put("cnt",cnt) &nbsp; &nbsp; &nbsp; (month+city,map)// month+city??rowkey&nbsp; cnt??????column &nbsp; &nbsp; }).addSink(new MyHbaseSink("monthHbaseTable") &nbsp;//?????????????????????? monthStat &nbsp;tableEnv.registerTable("monthStat",monthCount) &nbsp;//??????????id?????? &nbsp;val totalCount = tableEnv.sqlQuery( &nbsp; &nbsp; &nbsp; s""" &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;|select city,sum(cnt) as cityCnt from monthStat&nbsp; group by city &nbsp; &nbsp; &nbsp; """.stripMargin) //??????????????????hbase??rokey??city &nbsp; &nbsp; totalCount.toRetractStream[Row].filter(_._1).map(line=&gt;{ &nbsp; &nbsp; &nbsp; val row=line._2 &nbsp; &nbsp; &nbsp; val city=row.getField(0).toString &nbsp; &nbsp; &nbsp; val totalCnt=row.getField(1).toString &nbsp; &nbsp; &nbsp; val map=new util.HashMap[String,String]() &nbsp; &nbsp; &nbsp; map.put("totalCnt",totalCnt) &nbsp; &nbsp; &nbsp; (city,map) &nbsp; &nbsp; }).addSink("totalHbaseTable")
