Hi, I have run a program to monitor the sum of the delay in every minutes of a stream,this is my code:
.map(new RichMapFunction[String,(Long,Int)] { override def map(in: String): (Long,Int) = { var str:String = "" try { val arr = in.split("\\|") ((System.currentTimeMillis()/1000 - arr(10).trim.toLong) / 60,1) }catch { case e:Exception =>{ System.out.println("data has been dropped" + str) null } } } }).slotSharingGroup("kafkaSource").setParallelism(200) .filter(item =>item !=null && item._1 >=0).slotSharingGroup("kafkaSource").setParallelism(200) signalSource.keyBy(f=>f._1 ) .window(TumblingProcessingTimeWindows.of(Time.minutes(5))) .reduce { (e1,e2) =>(e1._1,e1._2+e2._2)}.setParallelism(20).slotSharingGroup("Delay") .addSink(new OracleSink).setParallelism(1).slotSharingGroup("OracleSink").name("OracleSinkDelay") but there is a problem,when the data is not delaying,the key of 1,2,3,4,5 > have so much data that the backPressure is always 1,has any way to avoid > this condition? please give me some advice!thank you so much.