??????   ????????flink 1.9??blink planner


???? ?? ?????? ????id  ???????????????????????? 
????????????????hbase????monthtable   rowkey??month+city




?????????????????? ????city???? ??????hbase????totalTable ??rowkey?? city






????????18?????? ????????restore??ck??????????????????  
????????cnt??????????????????totalTable??monthtable??????????????????????????????????????


????????????




val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, bsSettings)


myDataStream=......




tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id)




//??????????????????????id??????id??city??????????????????id??????????
val monthCount = tableEnv.sqlQuery(
      s"""
         select month,city,count(distinct id) as cnt 
from monthtable  group by month,city
      """.stripMargin)


//??????????????????hbase??rokey??month+city
monthCount.toRetractStream[Row].filter(_._1).map(line=>{
      val row=line._2
      val month=row.getField(0).toString
      val city=row.getField(1).toString
      val cnt=row.getField(2).toString
      val map=new util.HashMap[String,String]()
      map.put("cnt",cnt)
      (month+city,map)// month+city??rowkey  cnt??????column
    }).addSink(new MyHbaseSink("monthHbaseTable")






 //?????????????????????? monthStat
 tableEnv.registerTable("monthStat",monthCount)


 //??????????id??????
 val totalCount = tableEnv.sqlQuery(
      s"""
         |select city,sum(cnt) as cityCnt from 
monthStat  group by city
      """.stripMargin)


//??????????????????hbase??rokey??city
    totalCount.toRetractStream[Row].filter(_._1).map(line=>{
      val row=line._2
      val city=row.getField(0).toString
      val totalCnt=row.getField(1).toString
      val map=new util.HashMap[String,String]()
      map.put("totalCnt",totalCnt)
      (city,map)
    }).addSink("totalHbaseTable")

回复