????????

--????
FLINK 1.10.0 ON YARN


--????
1.????????  .window(TumblingProcessingTimeWindows.of(Time.days(1)))????
2.????????new 
Trigger(????.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
3.????????new 
ProcessWindowFunction(),????????????????????????????????0??????????????????????????????????????????
--????
 ?? new 
ProcessWindowFunction()????????ValueState????????????0????????ValueState??????????????????????????ValueState??????????????????????????????????????????.clear()??????????????????????????????????



--????????


  .window(TumblingProcessingTimeWindows.of(Time.days(1)))   
.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
.process(new ProcessWindowFunction[(String,String,Long), String, Tuple, 
TimeWindow] {


        private var pv_st: ValueState[Long] = _  
         


        override def open(parameters: Configuration): Unit 
= {
         pv_st = getRuntimeContext.getState[Long](new 
ValueStateDescriptor[Long]("pv_stCount", classOf[Long]))
        }


       override def process(key: Tuple, context: Context, 
elements: Iterable[(String,String,Long)], out: Collector[String]): Unit = {
          var c_st = 0
   
          val elementsIterator = elements.iterator
          // ??????????????????????word
          while (elementsIterator.hasNext) {
            val ac_name = 
elementsIterator.next()._2
            if(!ac_name.isEmpty && 
ac_name.equals("listentime")){
              c_st +=1
            }
          }
          val time: Date = new Date()
          val dateFormat: SimpleDateFormat = new 
SimpleDateFormat("yyyy-MM-dd")
          val date = dateFormat.format(time)


          // add current
         pv_st.update(pv_st.value() + c_st)


          var jsonStr = 
""+key.getField(0)+"_"+date+"&" // json????????
          jsonStr += "{"+
                    
"\"yesterday_foreground_play_pv\":\""+pv_st.value()+
                    "\"}";




          
//??????????????????????????????????????????????????????????????
          if(stateDate.equals("") || 
stateDate.equals(date)){
            stateDate=date
            out.collect(jsonStr)
          }else{
            out.collect(jsonStr)
            pv_st.clear()
            stateDate=date
          }


        }


      })

回复