????????
--????
FLINK 1.10.0 ON YARN
--????
1.???????? .window(TumblingProcessingTimeWindows.of(Time.days(1)))????
2.????????new
Trigger(????.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
3.????????new
ProcessWindowFunction(),????????????????????????????????0??????????????????????????????????????????
--????
?? new
ProcessWindowFunction()????????ValueState????????????0????????ValueState??????????????????????????ValueState??????????????????????????????????????????.clear()??????????????????????????????????
--????????
.window(TumblingProcessingTimeWindows.of(Time.days(1)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
.process(new ProcessWindowFunction[(String,String,Long), String, Tuple,
TimeWindow] {
private var pv_st: ValueState[Long] = _
override def open(parameters: Configuration): Unit
= {
pv_st = getRuntimeContext.getState[Long](new
ValueStateDescriptor[Long]("pv_stCount", classOf[Long]))
}
override def process(key: Tuple, context: Context,
elements: Iterable[(String,String,Long)], out: Collector[String]): Unit = {
var c_st = 0
val elementsIterator = elements.iterator
// ??????????????????????word
while (elementsIterator.hasNext) {
val ac_name =
elementsIterator.next()._2
if(!ac_name.isEmpty &&
ac_name.equals("listentime")){
c_st +=1
}
}
val time: Date = new Date()
val dateFormat: SimpleDateFormat = new
SimpleDateFormat("yyyy-MM-dd")
val date = dateFormat.format(time)
// add current
pv_st.update(pv_st.value() + c_st)
var jsonStr =
""+key.getField(0)+"_"+date+"&" // json????????
jsonStr += "{"+
"\"yesterday_foreground_play_pv\":\""+pv_st.value()+
"\"}";
//??????????????????????????????????????????????????????????????
if(stateDate.equals("") ||
stateDate.equals(date)){
stateDate=date
out.collect(jsonStr)
}else{
out.collect(jsonStr)
pv_st.clear()
stateDate=date
}
}
})