??????????????????Flink 
processFunction??????state??????????????????????????????????????????????????flatmap????????????????endTime????????????????key+endTime????keyby????processFunction
 open()????????????value 
state??????processElement()????????????state??Timer????ontimer()??????state????????????????????state??????????????????
 ??checkpoint 
size??????????????????????????????????????????????8????????checkpoint 
size??????????????????????????????????1.6??G????????????????????????state??????????????????????????????????????
 ???????? job????processFunction???? val stream = 
env.addSource(consumer).uid("DianDianUserAppViewCount_Source")     
.map(rawEvent => { try { val eventJson = parse(rawEvent) if ((eventJson \ 
"type").extract[String] == "track" &&             (eventJson \ 
"project_id").extract[Int] == 6 &&             (eventJson \ "properties" \ 
"$is_login_id").extract[Boolean] &&             (eventJson \ 
"time").extract[Long] >= startFromTimestamp.toLong) Some(eventJson) else None   
    } catch { case _ => None       }     }).uid("DianDianUserAppViewCount_Map") 
    .filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter")     
.flatMap{       item => val timestamp = (item.get \ "time").extract[Long]       
  SlidingEventTimeWindows             .of(Time.days(90), Time.days(1), 
Time.hours(-8))             .assignWindows(null, timestamp, null)             
.map{ case timeWindow => RichAppViewEvent( s"${(item.get \ 
"distinct_id").extract[String]}_${timeWindow.getEnd}", (item.get \ 
"distinct_id").extract[String], (item.get \ "event").extract[String], 
timestamp, timeWindow.getEnd                 )             }     
}.uid("DianDianUserAppViewCount_FlatMap")     
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) { 
override def extractTimestamp(element: RichAppViewEvent): Long = element.time   
  }).uid("DianDianUserAppViewCount_Watermarker")     .keyBy(_.key)     
.process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process")     
.writeUsingOutputFormat(new 
HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink") class 
ProcessAppViewCount extends KeyedProcessFunction[String, RichAppViewEvent, 
(String, String, Int)] { // 30????????  1000 * 60 * 60 * 24 * 30 private val 
thirtyDaysForTimestamp = 2592000000L private val dateFormat = new 
SimpleDateFormat("yyyyMMdd") private lazy val appViewCount: ValueState[Int] = 
getRuntimeContext.getState(new ValueStateDescriptor[Int]("AppViewCountState", 
classOf[Int])) override def processElement(value: RichAppViewEvent, ctx: 
KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context, 
out: Collector[(String, String, Int)]): Unit = { if (!isLate(value, ctx)){ // 
??????????????????????30?? val beforeThirtyDaysStartTime = value.windowEndTime 
- thirtyDaysForTimestamp // 
??????????????????????????30??????????$AppViewScreen?????????? var currentValue 
= appViewCount.value() if (value.time >= beforeThirtyDaysStartTime &&           
value.event.equals("$AppViewScreen")){         currentValue = currentValue + 1 
appViewCount.update(currentValue)       } // ?????????? out.collect(         
(value.distinctId, dateFormat.format(value.windowEndTime - 1), currentValue)    
   ) // ????cleanup timer registerCleanupTimer(value, ctx)     }   } override 
def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)]#OnTimerContext, out: 
Collector[(String, String, Int)]): Unit = { if (appViewCount != null){ 
appViewCount.clear()     }   } /**     * 
????????watermarker????????????????????????????????????????????     * 
????????????????????????????     * @param value * @param ctx * @return */ 
private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)]#Context): Boolean = {     
ctx.timerService().currentWatermark() >= value.windowEndTime   } /**     * 
????????????????????????????????????     * @param value * @param ctx */ private 
def registerCleanupTimer(value: RichAppViewEvent, ctx: 
KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) 
= { val cleanupTime = value.windowEndTime     
ctx.timerService().registerEventTimeTimer(cleanupTime)   } }

回复