RE: Is it possible to rate limit an UDP?

2019-01-12 Thread email
Thank you for your suggestion Ramandeep , but the code is not clear to me. 
Could you please explain it?  Particularly this part : 

 

Flowable.zip[java.lang.Long, Row, Row](delSt, itF, new 
BiFunction[java.lang.Long, Row, Row]() {

 

Also , is it possible to achieve this without third party libraries? 

 

Thank you

 

From: Ramandeep Singh  
Sent: Thursday, January 10, 2019 1:48 AM
To: Sonal Goyal 
Cc: em...@yeikel.com; user 
Subject: Re: Is it possible to rate limit an UDP?

 

Backpressure is the suggested way out here and is the correct approach, it rate 
limits at the source itself for safety.   Imagine a service with throttling 
enabled, It can outright reject your calls. 

 

Even if you split your df that alone won't achieve your purpose, You can 
combine that with backpressure enabled API or restricting by time.

 

Here's an example, Using RxJava, if you don't want to use any streaming api. 

def main(args: Array[String]): Unit = {
  val ss = 
SparkSession.builder().master("local[*]").enableHiveSupport().getOrCreate()

  import ss.sqlContext.implicits._

  val df = ss.read.json("src/main/resources/person.json")
  implicit val encoder = RowEncoder(df.schema)
  df.repartition(2).mapPartitions(it => {
val itF = Flowable.fromIterable[Row](it.toIterable.asJava)
val delSt = Flowable.interval(1, TimeUnit.SECONDS)
Flowable.zip[java.lang.Long, Row, Row](delSt, itF, new 
BiFunction[java.lang.Long, Row, Row]() {
  override def apply(t1: java.lang.Long, t2: Row): Row = {
//call api here
t2
  }
}).toList.blockingGet().iterator().asScala
  })
  df.show()
}

 

On Wed, Jan 9, 2019 at 6:12 AM Sonal Goyal mailto:sonalgoy...@gmail.com> > wrote:

Have you tried controlling the number of partitions of the dataframe? Say you 
have 5 partitions, it means you are making 5 concurrent calls to the web 
service. The throughput of the web service would be your bottleneck and Spark 
workers would be waiting for tasks, but if you cant control the REST service, 
maybe its worth a shot.  




Thanks,
Sonal
Nube Technologies 

  

 





 

 

On Wed, Jan 9, 2019 at 4:51 AM mailto:em...@yeikel.com> > 
wrote:

I have a data frame for which I apply an UDF that calls a REST web service.  
This web service is distributed in only a few nodes and it won’t be able to 
handle a massive load from Spark. 

 

Is it possible to rate limit this UDP? For example , something like 100 op/s. 

 

If not , what are the options? Is splitting the df an option? 

 

I’ve read a similar question in Stack overflow [1] and the solution suggests 
Spark Streaming , but my application does not involve streaming. Do I need to 
turn the operations into a streaming workflow to achieve something like that? 

 

Current Workflow : Hive -> Spark ->  Service

 

Thank you

 

[1] 
https://stackoverflow.com/questions/43953882/how-to-rate-limit-a-spark-map-operation
 

 




 

-- 

Regards,

Ramandeep Singh

Blog:http://ramannanda.blogspot.com



Re: spark2.4 arrow enabled true,error log not returned

2019-01-12 Thread Felix Cheung
Do you mean you run the same code on yarn and standalone? Can you check if they 
are running the same python versions?



From: Bryan Cutler 
Sent: Thursday, January 10, 2019 5:29 PM
To: libinsong1...@gmail.com
Cc: zlist Spark
Subject: Re: spark2.4 arrow enabled true,error log not returned

Hi, could you please clarify if you are running a YARN cluster when you see 
this problem?  I tried on Spark standalone and could not reproduce.  If it's on 
a YARN cluster, please file a JIRA and I can try to investigate further.

Thanks,
Bryan

On Sat, Dec 15, 2018 at 3:42 AM 李斌松 
mailto:libinsong1...@gmail.com>> wrote:
spark2.4 arrow enabled true,error log not returned,in spark 2.3,There's no such 
problem.

1、spark.sql.execution.arrow.enabled=true
[image.png]
yarn log:

18/12/15 14:35:52 INFO CodeGenerator: Code generated in 1030.698785 ms
18/12/15 14:35:54 INFO PythonRunner: Times: total = 1985, boot = 1892, init = 
92, finish = 1
18/12/15 14:35:54 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1799 
bytes result sent to driver
18/12/15 14:35:55 INFO CoarseGrainedExecutorBackend: Got assigned task 1
18/12/15 14:35:55 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
18/12/15 14:35:55 INFO TorrentBroadcast: Started reading broadcast variable 1
18/12/15 14:35:55 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 8.3 KB, free 1048.8 MB)
18/12/15 14:35:55 INFO TorrentBroadcast: Reading broadcast variable 1 took 18 ms
18/12/15 14:35:55 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 14.0 KB, free 1048.8 MB)
18/12/15 14:35:55 INFO CodeGenerator: Code generated in 30.269745 ms
18/12/15 14:35:55 INFO PythonRunner: Times: total = 13, boot = 5, init = 7, 
finish = 1
18/12/15 14:35:55 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1893 
bytes result sent to driver
18/12/15 14:35:55 INFO CoarseGrainedExecutorBackend: Got assigned task 2
18/12/15 14:35:55 INFO Executor: Running task 1.0 in stage 1.0 (TID 2)
18/12/15 14:35:55 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 2)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/install/pyspark/2.4.0/pyspark.zip/pyspark/worker.py", line 377, in 
main
process()
  File "/usr/install/pyspark/2.4.0/pyspark.zip/pyspark/worker.py", line 372, in 
process
serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/install/pyspark/2.4.0/pyspark.zip/pyspark/serializers.py", line 
390, in dump_stream
vs = list(itertools.islice(iterator, batch))
  File "/usr/install/pyspark/2.4.0/pyspark.zip/pyspark/util.py", line 99, in 
wrapper
return f(*args, **kwargs)
  File 
"/yarn/nm/usercache/admin/appcache/application_1544579748138_0215/container_e43_1544579748138_0215_01_01/python1.py",
 line 435, in mapfunc
ValueError: could not convert string to float: 'a'

at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.hasNext(ArrowConverters.scala:99)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at 
org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.foreach(ArrowConverters.scala:97)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at 
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at 
org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.to(ArrowConverters.scala:97)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at 
org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toBuffer(ArrowConverters.scala:97)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at 
org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.toArray(ArrowConverters.scala:97)
at 
org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17$$anonfun$apply$18.apply(Dataset.scala:3314)
at 
org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17$$anonfun$apply$18.apply(Dataset.scala:3314)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)