??????????????????Flink
processFunction??????state??????????????????????????????????????????????????flatmap????????????????endTime????????????????key+endTime????keyby????processFunction
open()????????????value
state??????processElement()????????????state??Timer????ontimer()??????state????????????????????state??????????????????
??checkpoint
size??????????????????????????????????????????????8????????checkpoint
size??????????????????????????????????1.6??G????????????????????????state??????????????????????????????????????
???????? job????processFunction???? val stream =
env.addSource(consumer).uid("DianDianUserAppViewCount_Source")
.map(rawEvent => { try { val eventJson = parse(rawEvent) if ((eventJson \
"type").extract[String] == "track" && (eventJson \
"project_id").extract[Int] == 6 && (eventJson \ "properties" \
"$is_login_id").extract[Boolean] && (eventJson \
"time").extract[Long] >= startFromTimestamp.toLong) Some(eventJson) else None
} catch { case _ => None } }).uid("DianDianUserAppViewCount_Map")
.filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter")
.flatMap{ item => val timestamp = (item.get \ "time").extract[Long]
SlidingEventTimeWindows .of(Time.days(90), Time.days(1),
Time.hours(-8)) .assignWindows(null, timestamp, null)
.map{ case timeWindow => RichAppViewEvent( s"${(item.get \
"distinct_id").extract[String]}_${timeWindow.getEnd}", (item.get \
"distinct_id").extract[String], (item.get \ "event").extract[String],
timestamp, timeWindow.getEnd ) }
}.uid("DianDianUserAppViewCount_FlatMap")
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) {
override def extractTimestamp(element: RichAppViewEvent): Long = element.time
}).uid("DianDianUserAppViewCount_Watermarker") .keyBy(_.key)
.process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process")
.writeUsingOutputFormat(new
HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink") class
ProcessAppViewCount extends KeyedProcessFunction[String, RichAppViewEvent,
(String, String, Int)] { // 30???????? 1000 * 60 * 60 * 24 * 30 private val
thirtyDaysForTimestamp = 2592000000L private val dateFormat = new
SimpleDateFormat("yyyyMMdd") private lazy val appViewCount: ValueState[Int] =
getRuntimeContext.getState(new ValueStateDescriptor[Int]("AppViewCountState",
classOf[Int])) override def processElement(value: RichAppViewEvent, ctx:
KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context,
out: Collector[(String, String, Int)]): Unit = { if (!isLate(value, ctx)){ //
??????????????????????30?? val beforeThirtyDaysStartTime = value.windowEndTime
- thirtyDaysForTimestamp //
??????????????????????????30??????????$AppViewScreen?????????? var currentValue
= appViewCount.value() if (value.time >= beforeThirtyDaysStartTime &&
value.event.equals("$AppViewScreen")){ currentValue = currentValue + 1
appViewCount.update(currentValue) } // ?????????? out.collect(
(value.distinctId, dateFormat.format(value.windowEndTime - 1), currentValue)
) // ????cleanup timer registerCleanupTimer(value, ctx) } } override
def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String,
RichAppViewEvent, (String, String, Int)]#OnTimerContext, out:
Collector[(String, String, Int)]): Unit = { if (appViewCount != null){
appViewCount.clear() } } /** *
????????watermarker???????????????????????????????????????????? *
???????????????????????????? * @param value * @param ctx * @return */
private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String,
RichAppViewEvent, (String, String, Int)]#Context): Boolean = {
ctx.timerService().currentWatermark() >= value.windowEndTime } /** *
???????????????????????????????????? * @param value * @param ctx */ private
def registerCleanupTimer(value: RichAppViewEvent, ctx:
KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context)
= { val cleanupTime = value.windowEndTime
ctx.timerService().registerEventTimeTimer(cleanupTime) } }