[jira] [Commented] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494708#comment-16494708 ] Felix Cheung commented on SPARK-23650: -- sorry, I really don't have time/resource to investigate this for now. hopefully later... > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > Attachments: packageReload.txt, read_model_in_udf.txt, > sparkR_log2.txt, sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421933#comment-16421933 ] Deepansh commented on SPARK-23650: -- @[~felixcheung] following up on above discussion. > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > Attachments: packageReload.txt, read_model_in_udf.txt, > sparkR_log2.txt, sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411246#comment-16411246 ] Deepansh commented on SPARK-23650: -- R environment inside the thread for applying UDF is not getting cached. It is created and destroyed with each query. {code:R} kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = "10.117.172.48:9092", topic = "source") lines<- select(kafka, cast(kafka$value, "string")) schema<-schema(lines) library(caret) df4<-dapply(lines,function(x){ print(system.time(library(caret))) x },schema) q2 <- write.stream(df4,"kafka", checkpointLocation = loc, topic = "sink", kafka.bootstrap.servers = "10.117.172.48:9092") awaitTermination(q2) {code} For the above code, for every new stream my output is, 18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: lattice 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: Attaching package: ‘lattice’ 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: The following object is masked from ‘package:SparkR’: 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: histogram 18/03/23 11:08:10 INFO BufferedStreamThread: 18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: ggplot2 18/03/23 11:08:12 INFO BufferedStreamThread:user system elapsed 18/03/23 11:08:12 INFO BufferedStreamThread: 1.937 0.062 1.999 18/03/23 11:08:12 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s, broadcast = 0.001 s, read-input = 0.001 s, compute = 2.064 s, write-output = 0.001 s, total = 2.093 s PFA: rest log file. For every new coming stream, the packages are loaded again inside the thread, which means R environment inside the thread is not getting reused, it is created and destroyed every time. The model(iris model), on which I am testing requires caret package. So, when I use the readRDS method, caret package is also loaded, which adds an overhead of (~2s) every time. The same problem is with the broadcast. Broadcasting the model doesn't take time, but when it deserializes the model it loads caret package which adds 2s overhead. Ideally, the packages shouldn't load again. Is there a way around to this problem? > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > Attachments: read_model_in_udf.txt, sparkR_log2.txt, sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410856#comment-16410856 ] Felix Cheung commented on SPARK-23650: -- can you clarify where you see "R environment inside the thread for applying UDF is not getting cached. It is created and destroyed with each query."? > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > Attachments: read_model_in_udf.txt, sparkR_log2.txt, sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404213#comment-16404213 ] Deepansh commented on SPARK-23650: -- I tried reading the model in UDF, but for every new stream, the model is being read which is adding an overhead (~2s). IMO The problem here is the R environment is not getting cached. It is created and destroyed with each query. Attached - logs To overcome the problem, I was using broadcast, as technically broadcast is done only once to the executors. > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > Attachments: sparkR_log2.txt, sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404198#comment-16404198 ] Felix Cheung commented on SPARK-23650: -- Is there a reason for the broadcast? Could you instead distribute the .rds to all the executor and then call readRDS from within your UDF? I understand this approach has been done quite a bit. > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > Attachments: sparkR_log2.txt, sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404183#comment-16404183 ] Deepansh commented on SPARK-23650: -- Is there any other way to implement my use case with minimum(ms) overhead. Use case - Input data from Kafka streams and apply a native R model for them and return the prediction to Kafka sink/any other sink. > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > Attachments: sparkR_log2.txt, sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401525#comment-16401525 ] Felix Cheung commented on SPARK-23650: -- do you mean this? RRunner: Times: boot = 0.010 s, init = 0.005 s, broadcast = 1.894 s, read-input = 0.001 s, compute = 0.062 s, write-output = 0.002 s, total = 1.974 s Under the cover it is working with the same R process. I see SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006 each time it is creating a new broadcast which would then needs to be transferred. IMO there are a few things to look into: # it should detect if the broadcast is the same (not sure if it does that) # if it is attributed to the same broadcast in use.daemon mode then it perhaps doesn't have to transfer it again # data transfer can be faster (SPARK-18924) However, as of now RRunner simply picks up the broadcast that is pass to it and sends it. > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > Attachments: sparkR_log2.txt, sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398237#comment-16398237 ] Deepansh commented on SPARK-23650: -- attaching more of logs. > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > Attachments: sparkR_log2.txt, sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398174#comment-16398174 ] Felix Cheung commented on SPARK-23650: -- I see one RRunner - do you have more of the log? > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > Attachments: sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396834#comment-16396834 ] Deepansh commented on SPARK-23650: -- I tried on local as well as yarn cluster, the result is more or less the same. Due to this, I went through spark code and as my understanding goes every time a new Kafka stream comes, spark creates a new RRunner class object and broadcast variables are shipped off to it. But it should happen only once and not every time stream comes? > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23650) Slow SparkR udf (dapply)
[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396607#comment-16396607 ] Felix Cheung commented on SPARK-23650: -- which system/platform are you running on? > Slow SparkR udf (dapply) > > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming >Affects Versions: 2.2.0 >Reporter: Deepansh >Priority: Major > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org