Hi,
I have run a program to monitor the sum of the delay in every minutes of a
stream,this is my code:

.map(new RichMapFunction[String,(Long,Int)] {
    override def map(in: String): (Long,Int) = {
      var str:String = ""
      try {
        val arr = in.split("\\|")
        ((System.currentTimeMillis()/1000 - arr(10).trim.toLong) / 60,1)
      }catch {
        case e:Exception =>{
          System.out.println("data has been dropped" + str)
          null
        }
      }
    }
  }).slotSharingGroup("kafkaSource").setParallelism(200)
    .filter(item =>item !=null && item._1
>=0).slotSharingGroup("kafkaSource").setParallelism(200)
signalSource.keyBy(f=>f._1  )
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
  .reduce { (e1,e2)
=>(e1._1,e1._2+e2._2)}.setParallelism(20).slotSharingGroup("Delay")
    .addSink(new
OracleSink).setParallelism(1).slotSharingGroup("OracleSink").name("OracleSinkDelay")

but there is a problem,when the data is not delaying,the key of 1,2,3,4,5
> have so much data that the backPressure is always 1,has any way to avoid
> this condition?

please give me some advice!thank you so much.

Reply via email to