Hi,

What you could do to improve processing of a skewed data is to introduce
an artificial preaggregation. You could add some artificial uniformly
distributed secondary key and calculate your aggregates on (original
key, secondary uniform key) and then do the final aggregation in an
additional step.

Best,

Dawid

On 06/05/2021 09:24, jester jim wrote:
> Hi,
> I have run a program to monitor the sum of the delay in every minutes
> of a stream,this is my code:
> .map(new RichMapFunction[String,(Long,Int)] {
>     override def map(in: String): (Long,Int) = {
>       var str:String = "" try {
>         val arr = in.split("\\|")
>         ((System.currentTimeMillis()/1000 - arr(10).trim.toLong) / 60,1)
>       }catch {
>         case e:Exception =>{
>           System.out.println("data has been dropped" + str)
>           null }
>       }
>     }
>   }).slotSharingGroup("kafkaSource").setParallelism(200)
>     .filter(item =>item !=null && item._1 
> >=0).slotSharingGroup("kafkaSource").setParallelism(200)
> signalSource.keyBy(f=>f._1  )
> .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
>   .reduce { (e1,e2) 
> =>(e1._1,e1._2+e2._2)}.setParallelism(20).slotSharingGroup("Delay")
>     .addSink(new 
> OracleSink).setParallelism(1).slotSharingGroup("OracleSink").name("OracleSinkDelay")
>
>     but there is a problem,when the data is not delaying,the key of
>     1,2,3,4,5 have so much data that the backPressure is always 1,has
>     any way to avoid this condition?
>
> please give me some advice!thank you so much. 

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to