??????????????????????????????????????????????????????????????????????????????????????????????
????????????????????????????????????????????????restore????????????????????????restore?????? ????????????????error???? ------------------ ???????? ------------------ ??????: "Benchao Li"<[email protected]>; ????????: 2020??6??5??(??????) ????5:53 ??????: "user-zh"<[email protected]>; ????: Re: flink1.9 Sql ????????????????????????state?????? Hi?? ?????????????????????????????????????????? ????????????????cp??????????????key????????????????key???????????????????????????????????????????? ?????????????????????????????????????????????????????????????????????????????????????????????????????? ??????????????????????????????????????????????????blink planner????????????binary????????????debug???????? ?????????? ?????????????????????????????????????????????????????????????????????????????????????????? star <[email protected]> ??2020??6??5?????? ????4:02?????? > ???????????????????????????? > > > > > ------------------&nbsp;????????&nbsp;------------------ > ??????:&nbsp;"star"<[email protected]&gt;; > ????????:&nbsp;2020??6??5??(??????) ????10:40 > ??????:&nbsp;"[email protected]"<[email protected]&gt;; > > ????:&nbsp;??????flink1.9 Sql ????????????????????????state?????? > > > > ?????????????????????????????????????????????????????? > > > > > ------------------ ???????? ------------------ > ??????:&nbsp;"zhiyezou"<[email protected]&gt;; > ????????:&nbsp;2020??6??5??(??????) ????10:31 > ??????:&nbsp;"user-zh"<[email protected]&gt;; > > ????:&nbsp;??????flink1.9 Sql ????????????????????????state?????? > > > > Hi > > > ??????????????????????????????????????????????????????????????????????TTL > > > > > ------------------&amp;nbsp;????????&amp;nbsp;------------------ > ??????:&amp;nbsp;"star"<[email protected]&amp;gt;; > ????????:&amp;nbsp;2020??6??5??(??????) ????10:22 > ??????:&amp;nbsp;"[email protected]"<[email protected]&amp;gt;; > > ????:flink1.9 Sql ????????????????????????state?????? > > > > ????????????????flink 1.9??blink planner > > > > ???? ?? ?????? ????id; ???????????????????????? ????????????????hbase????monthtable;rowkey??month+city > > > > > ?????????????????? ????city???? ??????hbase????totalTable ??rowkey?? city > > > > > > > ????????18?????? ????????restore??ck??????????????????&amp;amp;nbsp; > ????????cnt??????????????????totalTable??monthtable?????????????????????????????????????? > > > ???????????? > > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) > val bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val tableEnv = StreamTableEnvironment.create(env, bsSettings) > > > myDataStream=...... > > > > > tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id) > > > > > //??????????????????????id??????id??city??????????????????id?????????? > val monthCount = tableEnv.sqlQuery( > &nbsp;s""" > select month,city,count(distinct id) as cnt from monthtable group by > month,city > &nbsp;""".stripMargin) > > > //??????????????????hbase??rokey??month+city > monthCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;gt;{ > val row=line._2 > &nbsp;val month=row.getField(0).toString > val city=row.getField(1).toString > &nbsp;val cnt=row.getField(2).toString > val map=new util.HashMap[String,String]() > &nbsp;map.put("cnt",cnt) > &nbsp;(month+city,map)&nbsp; &nbsp; &nbsp;// month+city??rowkey cnt??????column > &nbsp;}).addSink(new MyHbaseSink("monthHbaseTable") > > > > > > > //?????????????????????? monthStat > tableEnv.registerTable("monthStat",monthCount) > > > //??????????id?????? > val totalCount = tableEnv.sqlQuery( > s""" > select city,sum(cnt) as cityCnt from monthStat&amp;amp;nbsp; group by city > &nbsp;""".stripMargin) > > > //??????????????????hbase??rokey??city > &nbsp;totalCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;gt;{ > val row=line._2 > val city=row.getField(0).toString > val totalCnt=row.getField(1).toString > val map=new util.HashMap[String,String]() > map.put("totalCnt",totalCnt) > (city,map) > &nbsp;}).addSink("totalHbaseTable") -- Best, Benchao Li
