[jira] [Comment Edited] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16411246#comment-16411246 ] Deepansh edited comment on SPARK-23650 at 3/23/18 1:43 PM: --- R environment inside the thread for applying UDF is not getting reused(i think cached is not the right word for this context). It is created and destroyed with each query. {code:R} kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = "10.117.172.48:9092", topic = "source") lines<- select(kafka, cast(kafka$value, "string")) schema<-schema(lines) library(caret) df4<-dapply(lines,function(x){ print(system.time(library(caret))) x },schema) q2 <- write.stream(df4,"kafka", checkpointLocation = loc, topic = "sink", kafka.bootstrap.servers = "10.117.172.48:9092") awaitTermination(q2) {code} For the above code, for every new stream my output is, 18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: lattice 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: Attaching package: ‘lattice’ 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: The following object is masked from ‘package:SparkR’: 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: histogram 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: ggplot2 18/03/23 11:08:12 INFO BufferedStreamThread:user system elapsed 18/03/23 11:08:12 INFO BufferedStreamThread: 1.937 0.062 1.999 18/03/23 11:08:12 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s, broadcast = 0.001 s, read-input = 0.001 s, compute = 2.064 s, write-output = 0.001 s, total = 2.093 s PFA: rest log file. For every new coming stream, the packages are loaded again inside the thread, which means R environment inside the thread is not getting reused, it is created and destroyed every time. The model(iris model), on which I am testing requires caret package. So, when I use the readRDS method, caret package is also loaded, which adds an overhead of (~2s) every time. The same problem is with the broadcast. Broadcasting the model doesn't take time, but when it deserializes the model it loads caret package which adds 2s overhead. Ideally, the packages shouldn't load again. Is there a way around to this problem? was (Author: litup): R environment inside the thread for applying UDF is not getting cached. It is created and destroyed with each query. {code:R} kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = "10.117.172.48:9092", topic = "source") lines<- select(kafka, cast(kafka$value, "string")) schema<-schema(lines) library(caret) df4<-dapply(lines,function(x){ print(system.time(library(caret))) x },schema) q2 <- write.stream(df4,"kafka", checkpointLocation = loc, topic = "sink", kafka.bootstrap.servers = "10.117.172.48:9092") awaitTermination(q2) {code} For the above code, for every new stream my output is, 18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: lattice 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: Attaching package: ‘lattice’ 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: The following object is masked from ‘package:SparkR’: 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: histogram 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: ggplot2 18/03/23 11:08:12 INFO BufferedStreamThread:user system elapsed 18/03/23 11:08:12 INFO BufferedStreamThread: 1.937 0.062 1.999 18/03/23 11:08:12 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s, broadcast = 0.001 s, read-input = 0.001 s, compute = 2.064 s, write-output = 0.001 s, total = 2.093 s PFA: rest log file. For every new coming stream, the packages are loaded again inside the thread, which means R environment inside the thread is not getting reused, it is created and destroyed every time. The model(iris model), on which I am testing requires caret package. So, when I use the readRDS method, caret package is also loaded, which adds an overhead of (~2s) every time. The same problem is with the broadcast. Broadcasting the model doesn't take time, but when it deserializes the model it loads caret package which adds 2s overhead. Ideally, the packages shouldn't load again. Is there a way around to this problem? > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects V
[jira] [Comment Edited] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16404213#comment-16404213 ] Deepansh edited comment on SPARK-23650 at 3/18/18 10:16 PM: I tried reading the model in UDF, but for every new stream, the model is being read which is adding an overhead (~2s). IMO The problem here is that R environment inside the thread for applying UDF is not getting cached. It is created and destroyed with each query. Attached - logs To overcome the problem, I was using broadcast, as technically broadcast is done only once to the executors. was (Author: litup): I tried reading the model in UDF, but for every new stream, the model is being read which is adding an overhead (~2s). IMO The problem here is the R environment is not getting cached. It is created and destroyed with each query. Attached - logs To overcome the problem, I was using broadcast, as technically broadcast is done only once to the executors. > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > Attachments: read_model_in_udf.txt, sparkR_log2.txt, sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401525#comment-16401525 ] Felix Cheung edited comment on SPARK-23650 at 3/16/18 7:16 AM: --- do you mean this? RRunner: Times: boot = 0.010 s, init = 0.005 s, broadcast = 1.894 s, read-input = 0.001 s, compute = 0.062 s, write-output = 0.002 s, total = 1.974 s Under the cover it is working with the same R process. I see SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006 each time it is creating a new broadcast which would then needs to be transferred. IMO there are a few things to look into: # it should detect if the broadcast is the same (not sure if it does that) # if it is attributed to the same broadcast in use.daemon mode then it perhaps doesn't have to transfer it again (but it would need to keep track of the stage executed before and broadcast that was sent before etc) # data transfer can be faster (SPARK-18924) However, as of now RRunner simply picks up the broadcast that is pass to it and sends it. was (Author: felixcheung): do you mean this? RRunner: Times: boot = 0.010 s, init = 0.005 s, broadcast = 1.894 s, read-input = 0.001 s, compute = 0.062 s, write-output = 0.002 s, total = 1.974 s Under the cover it is working with the same R process. I see SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006 each time it is creating a new broadcast which would then needs to be transferred. IMO there are a few things to look into: # it should detect if the broadcast is the same (not sure if it does that) # if it is attributed to the same broadcast in use.daemon mode then it perhaps doesn't have to transfer it again # data transfer can be faster (SPARK-18924) However, as of now RRunner simply picks up the broadcast that is pass to it and sends it. > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > Attachments: sparkR_log2.txt, sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398237#comment-16398237 ] Deepansh edited comment on SPARK-23650 at 3/15/18 4:39 AM: --- attached more logs. was (Author: litup): attaching more of logs. > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > Attachments: sparkR_log2.txt, sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396834#comment-16396834 ] Deepansh edited comment on SPARK-23650 at 3/13/18 11:37 AM: I tried on local as well as yarn cluster, the result is more or less the same. Due to this, I went through spark code and as my understanding goes every time a new Kafka stream comes, spark creates a new RRunner class object and broadcast variables, packages are shipped off to create a new R worker. But it should happen only once and not every time stream comes? PFA: log file was (Author: litup): I tried on local as well as yarn cluster, the result is more or less the same. Due to this, I went through spark code and as my understanding goes every time a new Kafka stream comes, spark creates a new RRunner class object and broadcast variables, packages are shipped off to create a new R worker. But it should happen only once and not every time stream comes? > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > Attachments: sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396834#comment-16396834 ] Deepansh edited comment on SPARK-23650 at 3/13/18 11:25 AM: I tried on local as well as yarn cluster, the result is more or less the same. Due to this, I went through spark code and as my understanding goes every time a new Kafka stream comes, spark creates a new RRunner class object and broadcast variables, packages are shipped off to create a new R worker. But it should happen only once and not every time stream comes? was (Author: litup): I tried on local as well as yarn cluster, the result is more or less the same. Due to this, I went through spark code and as my understanding goes every time a new Kafka stream comes, spark creates a new RRunner class object and broadcast variables are shipped off to it. But it should happen only once and not every time stream comes? > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org