??????????????????????????????????????????????????????????????????????????????????????????????


????????????????????????????????????????????????restore????????????????????????restore??????


????????????????error????


------------------ ???????? ------------------
??????:&nbsp;"Benchao Li"<[email protected]&gt;;
????????:&nbsp;2020??6??5??(??????) ????5:53
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: flink1.9 Sql ????????????????????????state??????



Hi??

??????????????????????????????????????????

????????????????cp??????????????key????????????????key????????????????????????????????????????????

??????????????????????????????????????????????????????????????????????????????????????????????????????
??????????????????????????????????????????????????blink 
planner????????????binary????????????debug????????
??????????

??????????????????????????????????????????????????????????????????????????????????????????

star <[email protected]&gt; ??2020??6??5?????? ????4:02??????

&gt; ????????????????????????????
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;????????&amp;nbsp;------------------
&gt; ??????:&amp;nbsp;"star"<[email protected]&amp;gt;;
&gt; ????????:&amp;nbsp;2020??6??5??(??????) ????10:40
&gt; 
??????:&amp;nbsp;"[email protected]"<[email protected]&amp;gt;;
&gt;
&gt; ????:&amp;nbsp;??????flink1.9 Sql ????????????????????????state??????
&gt;
&gt;
&gt;
&gt; ??????????????????????????????????????????????????????
&gt;
&gt;
&gt;
&gt;
&gt; ------------------ ???????? ------------------
&gt; ??????:&amp;nbsp;"zhiyezou"<[email protected]&amp;gt;;
&gt; ????????:&amp;nbsp;2020??6??5??(??????) ????10:31
&gt; ??????:&amp;nbsp;"user-zh"<[email protected]&amp;gt;;
&gt;
&gt; ????:&amp;nbsp;??????flink1.9 Sql ????????????????????????state??????
&gt;
&gt;
&gt;
&gt; Hi
&gt;
&gt;
&gt; ??????????????????????????????????????????????????????????????????????TTL
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;amp;nbsp;????????&amp;amp;nbsp;------------------
&gt; ??????:&amp;amp;nbsp;"star"<[email protected]&amp;amp;gt;;
&gt; ????????:&amp;amp;nbsp;2020??6??5??(??????) ????10:22
&gt; 
??????:&amp;amp;nbsp;"[email protected]"<[email protected]&amp;amp;gt;;
&gt;
&gt; ????:flink1.9 Sql ????????????????????????state??????
&gt;
&gt;
&gt;
&gt; ????????????????flink 1.9??blink planner
&gt;
&gt;
&gt;
&gt; ???? ?? ?????? ????id; ???????????????????????? 
????????????????hbase????monthtable;rowkey??month+city
&gt;
&gt;
&gt;
&gt;
&gt; ?????????????????? ????city???? ??????hbase????totalTable ??rowkey?? city
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; ????????18?????? ????????restore??ck??????????????????&amp;amp;amp;nbsp;
&gt; 
????????cnt??????????????????totalTable??monthtable??????????????????????????????????????
&gt;
&gt;
&gt; ????????????
&gt;
&gt;
&gt;
&gt;
&gt; val env = StreamExecutionEnvironment.getExecutionEnvironment
&gt;
&gt; 
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
&gt; val bsSettings =
&gt; 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
&gt; val tableEnv = StreamTableEnvironment.create(env, bsSettings)
&gt;
&gt;
&gt; myDataStream=......
&gt;
&gt;
&gt;
&gt;
&gt; tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id)
&gt;
&gt;
&gt;
&gt;
&gt; //??????????????????????id??????id??city??????????????????id??????????
&gt; val monthCount = tableEnv.sqlQuery(
&gt; &amp;nbsp;s"""
&gt; select month,city,count(distinct id) as cnt from monthtable group by
&gt; month,city
&gt; &amp;nbsp;""".stripMargin)
&gt;
&gt;
&gt; //??????????????????hbase??rokey??month+city
&gt; monthCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;amp;gt;{
&gt; val row=line._2
&gt; &amp;nbsp;val month=row.getField(0).toString
&gt; val city=row.getField(1).toString
&gt; &amp;nbsp;val cnt=row.getField(2).toString
&gt; val map=new util.HashMap[String,String]()
&gt; &amp;nbsp;map.put("cnt",cnt)
&gt; &amp;nbsp;(month+city,map)&amp;nbsp; &amp;nbsp; &amp;nbsp;// 
month+city??rowkey cnt??????column
&gt; &amp;nbsp;}).addSink(new MyHbaseSink("monthHbaseTable")
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; //?????????????????????? monthStat
&gt; tableEnv.registerTable("monthStat",monthCount)
&gt;
&gt;
&gt; //??????????id??????
&gt; val totalCount = tableEnv.sqlQuery(
&gt; s"""
&gt; select city,sum(cnt) as cityCnt from monthStat&amp;amp;amp;nbsp; group by 
city
&gt; &amp;nbsp;""".stripMargin)
&gt;
&gt;
&gt; //??????????????????hbase??rokey??city
&gt; 
&amp;nbsp;totalCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;amp;gt;{
&gt; val row=line._2
&gt; val city=row.getField(0).toString
&gt; val totalCnt=row.getField(1).toString
&gt; val map=new util.HashMap[String,String]()
&gt; map.put("totalCnt",totalCnt)
&gt; (city,map)
&gt; &amp;nbsp;}).addSink("totalHbaseTable")



-- 

Best,
Benchao Li

回复