各位好,本人在使用Flink
processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction
open()里我初始化了value
state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话
那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint
size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下
job以及processFunction代码
val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source")
.map(rawEvent => {
try {
val eventJson = parse(rawEvent)
if ((eventJson \ "type").extract[String] == "track" &&
(eventJson \ "project_id").extract[Int] == 6 &&
(eventJson \ "properties" \ "$is_login_id").extract[Boolean] &&
(eventJson \ "time").extract[Long] >= startFromTimestamp.toLong)
Some(eventJson)
else
None
} catch {
case _ => None
}
}).uid("DianDianUserAppViewCount_Map")
.filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter")
.flatMap{
item =>
val timestamp = (item.get \ "time").extract[Long]
SlidingEventTimeWindows
.of(Time.days(90), Time.days(1), Time.hours(-8))
.assignWindows(null, timestamp, null)
.map{
case timeWindow =>
RichAppViewEvent(
s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}",
(item.get \ "distinct_id").extract[String],
(item.get \ "event").extract[String],
timestamp,
timeWindow.getEnd
)
}
}.uid("DianDianUserAppViewCount_FlatMap")
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) {
override def extractTimestamp(element: RichAppViewEvent): Long = element.time
}).uid("DianDianUserAppViewCount_Watermarker")
.keyBy(_.key)
.process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process")
.writeUsingOutputFormat(new
HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink")
class ProcessAppViewCount extends KeyedProcessFunction[String,
RichAppViewEvent, (String, String, Int)] {
// 30天毫秒值 1000 * 60 * 60 * 24 * 30
private val thirtyDaysForTimestamp = 2592000000L
private val dateFormat = new SimpleDateFormat("yyyyMMdd")
private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new
ValueStateDescriptor[Int]("AppViewCountState", classOf[Int]))
override def processElement(value: RichAppViewEvent,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String,
Int)]#Context,
out: Collector[(String, String, Int)]): Unit = {
if (!isLate(value, ctx)){
// 根据窗口结束时间往前推30天
val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp
// 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加
var currentValue = appViewCount.value()
if (value.time >= beforeThirtyDaysStartTime &&
value.event.equals("$AppViewScreen")){
currentValue = currentValue + 1
appViewCount.update(currentValue)
}
// 发送到下游
out.collect(
(value.distinctId, dateFormat.format(value.windowEndTime - 1),
currentValue)
)
// 设置cleanup timer
registerCleanupTimer(value, ctx)
}
}
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String,
Int)]#OnTimerContext,
out: Collector[(String, String, Int)]): Unit = {
if (appViewCount != null){
appViewCount.clear()
}
}
/**
* 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话,
* 则认为是迟到数据不进行处理。
* @param value
* @param ctx
* @return
*/
private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String,
RichAppViewEvent, (String, String, Int)]#Context): Boolean = {
ctx.timerService().currentWatermark() >= value.windowEndTime
}
/**
* 以窗口结束时间做为清理定时器的时间戳
* @param value
* @param ctx
*/
private def registerCleanupTimer(value: RichAppViewEvent, ctx:
KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context)
= {
val cleanupTime = value.windowEndTime
ctx.timerService().registerEventTimeTimer(cleanupTime)
}
}