Thank you for your suggestion Ramandeep , but the code is not clear to me. 
Could you please explain it?  Particularly this part : 

 

Flowable.zip[java.lang.Long, Row, Row](delSt, itF, new 
BiFunction[java.lang.Long, Row, Row]() {

 

Also , is it possible to achieve this without third party libraries? 

 

Thank you

 

From: Ramandeep Singh <rs5...@nyu.edu> 
Sent: Thursday, January 10, 2019 1:48 AM
To: Sonal Goyal <sonalgoy...@gmail.com>
Cc: em...@yeikel.com; user <user@spark.apache.org>
Subject: Re: Is it possible to rate limit an UDP?

 

Backpressure is the suggested way out here and is the correct approach, it rate 
limits at the source itself for safety.   Imagine a service with throttling 
enabled, It can outright reject your calls. 

 

Even if you split your df that alone won't achieve your purpose, You can 
combine that with backpressure enabled API or restricting by time.

 

Here's an example, Using RxJava, if you don't want to use any streaming api. 

def main(args: Array[String]): Unit = {
  val ss = 
SparkSession.builder().master("local[*]").enableHiveSupport().getOrCreate()

  import ss.sqlContext.implicits._

  val df = ss.read.json("src/main/resources/person.json")
  implicit val encoder = RowEncoder(df.schema)
  df.repartition(2).mapPartitions(it => {
    val itF = Flowable.fromIterable[Row](it.toIterable.asJava)
    val delSt = Flowable.interval(1, TimeUnit.SECONDS)
    Flowable.zip[java.lang.Long, Row, Row](delSt, itF, new 
BiFunction[java.lang.Long, Row, Row]() {
      override def apply(t1: java.lang.Long, t2: Row): Row = {
        //call api here
        t2
      }
    }).toList.blockingGet().iterator().asScala
  })
  df.show()
}

 

On Wed, Jan 9, 2019 at 6:12 AM Sonal Goyal <sonalgoy...@gmail.com 
<mailto:sonalgoy...@gmail.com> > wrote:

Have you tried controlling the number of partitions of the dataframe? Say you 
have 5 partitions, it means you are making 5 concurrent calls to the web 
service. The throughput of the web service would be your bottleneck and Spark 
workers would be waiting for tasks, but if you cant control the REST service, 
maybe its worth a shot.  




Thanks,
Sonal
Nube Technologies 
<https://urldefense.proofpoint.com/v2/url?u=http-3A__www.nubetech.co&d=DwMFaQ&c=slrrB7dE8n7gBJbeO0g-IQ&r=MXyWMjAKhWxt951g8qOF6A&m=tbCN8_GOHUgkqXSc2UfGko1geGkhurMfq7watcAdgoE&s=mBMkukfF49HDvJdBwhtMyMmZzj3W-AC-0gVkjmuGHb4&e=>
  

 





 

 

On Wed, Jan 9, 2019 at 4:51 AM <em...@yeikel.com <mailto:em...@yeikel.com> > 
wrote:

I have a data frame for which I apply an UDF that calls a REST web service.  
This web service is distributed in only a few nodes and it won’t be able to 
handle a massive load from Spark. 

 

Is it possible to rate limit this UDP? For example , something like 100 op/s. 

 

If not , what are the options? Is splitting the df an option? 

 

I’ve read a similar question in Stack overflow [1] and the solution suggests 
Spark Streaming , but my application does not involve streaming. Do I need to 
turn the operations into a streaming workflow to achieve something like that? 

 

Current Workflow : Hive -> Spark ->  Service

 

Thank you

 

[1] 
https://stackoverflow.com/questions/43953882/how-to-rate-limit-a-spark-map-operation
 
<https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_43953882_how-2Dto-2Drate-2Dlimit-2Da-2Dspark-2Dmap-2Doperation&d=DwMFaQ&c=slrrB7dE8n7gBJbeO0g-IQ&r=MXyWMjAKhWxt951g8qOF6A&m=tbCN8_GOHUgkqXSc2UfGko1geGkhurMfq7watcAdgoE&s=IkicLLf6Q_VmCMwdsJdBYPEWz5zw1AG6J6VHpdUtalM&e=>
 




 

-- 

Regards,

Ramandeep Singh

Blog:http://ramannanda.blogspot.com

Reply via email to