?????? ????????flink 1.9??blink planner
???? ?? ?????? ????id ????????????????????????
????????????????hbase????monthtable rowkey??month+city
?????????????????? ????city???? ??????hbase????totalTable ??rowkey?? city
????????18?????? ????????restore??ck??????????????????
????????cnt??????????????????totalTable??monthtable??????????????????????????????????????
????????????
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, bsSettings)
myDataStream=......
tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id)
//??????????????????????id??????id??city??????????????????id??????????
val monthCount = tableEnv.sqlQuery(
s"""
select month,city,count(distinct id) as cnt
from monthtable group by month,city
""".stripMargin)
//??????????????????hbase??rokey??month+city
monthCount.toRetractStream[Row].filter(_._1).map(line=>{
val row=line._2
val month=row.getField(0).toString
val city=row.getField(1).toString
val cnt=row.getField(2).toString
val map=new util.HashMap[String,String]()
map.put("cnt",cnt)
(month+city,map)// month+city??rowkey cnt??????column
}).addSink(new MyHbaseSink("monthHbaseTable")
//?????????????????????? monthStat
tableEnv.registerTable("monthStat",monthCount)
//??????????id??????
val totalCount = tableEnv.sqlQuery(
s"""
|select city,sum(cnt) as cityCnt from
monthStat group by city
""".stripMargin)
//??????????????????hbase??rokey??city
totalCount.toRetractStream[Row].filter(_._1).map(line=>{
val row=line._2
val city=row.getField(0).toString
val totalCnt=row.getField(1).toString
val map=new util.HashMap[String,String]()
map.put("totalCnt",totalCnt)
(city,map)
}).addSink("totalHbaseTable")