Github user NarineK commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12836#discussion_r67197168
  
    --- Diff: R/pkg/inst/worker/worker.R ---
    @@ -79,75 +127,72 @@ if (numBroadcastVars > 0) {
     
     # Timing broadcast
     broadcastElap <- elapsedSecs()
    +# Initial input timing
    +inputElap <- broadcastElap
     
     # If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the 
int
     # as number of partitions to create.
     numPartitions <- SparkR:::readInt(inputCon)
     
    -isDataFrame <- as.logical(SparkR:::readInt(inputCon))
    +# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode
    +mode <- SparkR:::readInt(inputCon)
     
    -# If isDataFrame, then read column names
    -if (isDataFrame) {
    +if (mode > 0) {
       colNames <- SparkR:::readObject(inputCon)
     }
     
     isEmpty <- SparkR:::readInt(inputCon)
    +computeInputElapsDiff <- 0
    +outputComputeElapsDiff <- 0
     
     if (isEmpty != 0) {
    -
       if (numPartitions == -1) {
         if (deserializer == "byte") {
           # Now read as many characters as described in funcLen
           data <- SparkR:::readDeserialize(inputCon)
         } else if (deserializer == "string") {
           data <- as.list(readLines(inputCon))
    -    } else if (deserializer == "row") {
    +    } else if (deserializer == "row" && mode == 2) {
    +      dataWithKeys <- SparkR:::readMultipleObjectsWithKeys(inputCon)
    +      keys <- dataWithKeys$keys
    +      data <- dataWithKeys$data
    +    } else if (deserializer == "row"){
           data <- SparkR:::readMultipleObjects(inputCon)
         }
    -    # Timing reading input data for execution
    -    inputElap <- elapsedSecs()
     
    -    if (isDataFrame) {
    -      if (deserializer == "row") {
    -        # Transform the list of rows into a data.frame
    -        # Note that the optional argument stringsAsFactors for rbind is
    -        # available since R 3.2.4. So we set the global option here.
    -        oldOpt <- getOption("stringsAsFactors")
    -        options(stringsAsFactors = FALSE)
    -        data <- do.call(rbind.data.frame, data)
    -        options(stringsAsFactors = oldOpt)
    -
    -        names(data) <- colNames
    -      } else {
    -        # Check to see if data is a valid data.frame
    -        stopifnot(deserializer == "byte")
    -        stopifnot(class(data) == "data.frame")
    -      }
    -      output <- computeFunc(data)
    -      if (serializer == "row") {
    -        # Transform the result data.frame back to a list of rows
    -        output <- split(output, seq(nrow(output)))
    -      } else {
    -        # Serialize the ouput to a byte array
    -        stopifnot(serializer == "byte")
    +    inputElap <- elapsedSecs()
    +    if (mode > 0) {
    +      if (mode == 1) {
    +        # Timing reading input data for execution
    +        output <- compute(mode, partition, serializer, deserializer, NULL,
    +                    colNames, computeFunc, outputCon, data)
    +       } else {
    +        # gapply mode
    +        for (i in 1:length(data)) {
    +          # Timing reading input data for execution
    +          inputElap <- elapsedSecs()
    +          output <- compute(mode, partition, serializer, deserializer, 
keys[[i]],
    +                      colNames, computeFunc, outputCon, data[[i]])
    +          computeElap <- elapsedSecs()
    +          outputResult(serializer, output, outputCon)
    +          outputElap <- elapsedSecs()
    +          computeInputElapsDiff <-  computeInputElapsDiff + (computeElap - 
inputElap)
    +          outputComputeElapsDiff <- outputComputeElapsDiff + (outputElap - 
computeElap)
    +        }
           }
         } else {
    -      output <- computeFunc(partition, data)
    +      # Timing reading input data for execution
    --- End diff --
    
    Removed the comment - moved to line: 163
    
https://github.com/NarineK/spark/blob/4d1cc6b0fd3dd2839a6a879f43118c5828916733/R/pkg/inst/worker/worker.R#L163


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to