Repository: spark
Updated Branches:
  refs/heads/master 1b2aab8d5 -> 2fe0a1aae


http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala 
b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
new file mode 100644
index 0000000..ccb2a37
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.r
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.sql.{Date, Time}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Utility functions to serialize, deserialize objects to / from R
+ */
+private[spark] object SerDe {
+
+  // Type mapping from R to Java
+  //
+  // NULL -> void
+  // integer -> Int
+  // character -> String
+  // logical -> Boolean
+  // double, numeric -> Double
+  // raw -> Array[Byte]
+  // Date -> Date
+  // POSIXlt/POSIXct -> Time
+  //
+  // list[T] -> Array[T], where T is one of above mentioned types
+  // environment -> Map[String, T], where T is a native type
+  // jobj -> Object, where jobj is an object created in the backend
+
+  def readObjectType(dis: DataInputStream): Char = {
+    dis.readByte().toChar
+  }
+
+  def readObject(dis: DataInputStream): Object = {
+    val dataType = readObjectType(dis)
+    readTypedObject(dis, dataType)
+  }
+
+  def readTypedObject(
+      dis: DataInputStream,
+      dataType: Char): Object = {
+    dataType match {
+      case 'n' => null
+      case 'i' => new java.lang.Integer(readInt(dis))
+      case 'd' => new java.lang.Double(readDouble(dis))
+      case 'b' => new java.lang.Boolean(readBoolean(dis))
+      case 'c' => readString(dis)
+      case 'e' => readMap(dis)
+      case 'r' => readBytes(dis)
+      case 'l' => readList(dis)
+      case 'D' => readDate(dis)
+      case 't' => readTime(dis)
+      case 'j' => JVMObjectTracker.getObject(readString(dis))
+      case _ => throw new IllegalArgumentException(s"Invalid type $dataType")
+    }
+  }
+
+  def readBytes(in: DataInputStream): Array[Byte] = {
+    val len = readInt(in)
+    val out = new Array[Byte](len)
+    val bytesRead = in.readFully(out)
+    out
+  }
+
+  def readInt(in: DataInputStream): Int = {
+    in.readInt()
+  }
+
+  def readDouble(in: DataInputStream): Double = {
+    in.readDouble()
+  }
+
+  def readString(in: DataInputStream): String = {
+    val len = in.readInt()
+    val asciiBytes = new Array[Byte](len)
+    in.readFully(asciiBytes)
+    assert(asciiBytes(len - 1) == 0)
+    val str = new String(asciiBytes.dropRight(1).map(_.toChar))
+    str
+  }
+
+  def readBoolean(in: DataInputStream): Boolean = {
+    val intVal = in.readInt()
+    if (intVal == 0) false else true
+  }
+
+  def readDate(in: DataInputStream): Date = {
+    Date.valueOf(readString(in))
+  }
+
+  def readTime(in: DataInputStream): Time = {
+    val t = in.readDouble()
+    new Time((t * 1000L).toLong)
+  }
+
+  def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
+    val len = readInt(in)
+    (0 until len).map(_ => readBytes(in)).toArray
+  }
+
+  def readIntArr(in: DataInputStream): Array[Int] = {
+    val len = readInt(in)
+    (0 until len).map(_ => readInt(in)).toArray
+  }
+
+  def readDoubleArr(in: DataInputStream): Array[Double] = {
+    val len = readInt(in)
+    (0 until len).map(_ => readDouble(in)).toArray
+  }
+
+  def readBooleanArr(in: DataInputStream): Array[Boolean] = {
+    val len = readInt(in)
+    (0 until len).map(_ => readBoolean(in)).toArray
+  }
+
+  def readStringArr(in: DataInputStream): Array[String] = {
+    val len = readInt(in)
+    (0 until len).map(_ => readString(in)).toArray
+  }
+
+  def readList(dis: DataInputStream): Array[_] = {
+    val arrType = readObjectType(dis)
+    arrType match {
+      case 'i' => readIntArr(dis)
+      case 'c' => readStringArr(dis)
+      case 'd' => readDoubleArr(dis)
+      case 'b' => readBooleanArr(dis)
+      case 'j' => readStringArr(dis).map(x => JVMObjectTracker.getObject(x))
+      case 'r' => readBytesArr(dis)
+      case _ => throw new IllegalArgumentException(s"Invalid array type 
$arrType")
+    }
+  }
+
+  def readMap(in: DataInputStream): java.util.Map[Object, Object] = {
+    val len = readInt(in)
+    if (len > 0) {
+      val keysType = readObjectType(in)
+      val keysLen = readInt(in)
+      val keys = (0 until keysLen).map(_ => readTypedObject(in, keysType))
+
+      val valuesType = readObjectType(in)
+      val valuesLen = readInt(in)
+      val values = (0 until valuesLen).map(_ => readTypedObject(in, 
valuesType))
+      mapAsJavaMap(keys.zip(values).toMap)
+    } else {
+      new java.util.HashMap[Object, Object]()
+    }
+  }
+
+  // Methods to write out data from Java to R
+  //
+  // Type mapping from Java to R
+  //
+  // void -> NULL
+  // Int -> integer
+  // String -> character
+  // Boolean -> logical
+  // Double -> double
+  // Long -> double
+  // Array[Byte] -> raw
+  // Date -> Date
+  // Time -> POSIXct
+  //
+  // Array[T] -> list()
+  // Object -> jobj
+
+  def writeType(dos: DataOutputStream, typeStr: String): Unit = {
+    typeStr match {
+      case "void" => dos.writeByte('n')
+      case "character" => dos.writeByte('c')
+      case "double" => dos.writeByte('d')
+      case "integer" => dos.writeByte('i')
+      case "logical" => dos.writeByte('b')
+      case "date" => dos.writeByte('D')
+      case "time" => dos.writeByte('t')
+      case "raw" => dos.writeByte('r')
+      case "list" => dos.writeByte('l')
+      case "jobj" => dos.writeByte('j')
+      case _ => throw new IllegalArgumentException(s"Invalid type $typeStr")
+    }
+  }
+
+  def writeObject(dos: DataOutputStream, value: Object): Unit = {
+    if (value == null) {
+      writeType(dos, "void")
+    } else {
+      value.getClass.getName match {
+        case "java.lang.String" =>
+          writeType(dos, "character")
+          writeString(dos, value.asInstanceOf[String])
+        case "long" | "java.lang.Long" =>
+          writeType(dos, "double")
+          writeDouble(dos, value.asInstanceOf[Long].toDouble)
+        case "double" | "java.lang.Double" =>
+          writeType(dos, "double")
+          writeDouble(dos, value.asInstanceOf[Double])
+        case "int" | "java.lang.Integer" =>
+          writeType(dos, "integer")
+          writeInt(dos, value.asInstanceOf[Int])
+        case "boolean" | "java.lang.Boolean" =>
+          writeType(dos, "logical")
+          writeBoolean(dos, value.asInstanceOf[Boolean])
+        case "java.sql.Date" =>
+          writeType(dos, "date")
+          writeDate(dos, value.asInstanceOf[Date])
+        case "java.sql.Time" =>
+          writeType(dos, "time")
+          writeTime(dos, value.asInstanceOf[Time])
+        case "[B" =>
+          writeType(dos, "raw")
+          writeBytes(dos, value.asInstanceOf[Array[Byte]])
+        // TODO: Types not handled right now include
+        // byte, char, short, float
+
+        // Handle arrays
+        case "[Ljava.lang.String;" =>
+          writeType(dos, "list")
+          writeStringArr(dos, value.asInstanceOf[Array[String]])
+        case "[I" =>
+          writeType(dos, "list")
+          writeIntArr(dos, value.asInstanceOf[Array[Int]])
+        case "[J" =>
+          writeType(dos, "list")
+          writeDoubleArr(dos, value.asInstanceOf[Array[Long]].map(_.toDouble))
+        case "[D" =>
+          writeType(dos, "list")
+          writeDoubleArr(dos, value.asInstanceOf[Array[Double]])
+        case "[Z" =>
+          writeType(dos, "list")
+          writeBooleanArr(dos, value.asInstanceOf[Array[Boolean]])
+        case "[[B" =>
+          writeType(dos, "list")
+          writeBytesArr(dos, value.asInstanceOf[Array[Array[Byte]]])
+        case otherName =>
+          // Handle array of objects
+          if (otherName.startsWith("[L")) {
+            val objArr = value.asInstanceOf[Array[Object]]
+            writeType(dos, "list")
+            writeType(dos, "jobj")
+            dos.writeInt(objArr.length)
+            objArr.foreach(o => writeJObj(dos, o))
+          } else {
+            writeType(dos, "jobj")
+            writeJObj(dos, value)
+          }
+      }
+    }
+  }
+
+  def writeInt(out: DataOutputStream, value: Int): Unit = {
+    out.writeInt(value)
+  }
+
+  def writeDouble(out: DataOutputStream, value: Double): Unit = {
+    out.writeDouble(value)
+  }
+
+  def writeBoolean(out: DataOutputStream, value: Boolean): Unit = {
+    val intValue = if (value) 1 else 0
+    out.writeInt(intValue)
+  }
+
+  def writeDate(out: DataOutputStream, value: Date): Unit = {
+    writeString(out, value.toString)
+  }
+
+  def writeTime(out: DataOutputStream, value: Time): Unit = {
+    out.writeDouble(value.getTime.toDouble / 1000.0)
+  }
+
+
+  // NOTE: Only works for ASCII right now
+  def writeString(out: DataOutputStream, value: String): Unit = {
+    val len = value.length
+    out.writeInt(len + 1) // For the \0
+    out.writeBytes(value)
+    out.writeByte(0)
+  }
+
+  def writeBytes(out: DataOutputStream, value: Array[Byte]): Unit = {
+    out.writeInt(value.length)
+    out.write(value)
+  }
+
+  def writeJObj(out: DataOutputStream, value: Object): Unit = {
+    val objId = JVMObjectTracker.put(value)
+    writeString(out, objId)
+  }
+
+  def writeIntArr(out: DataOutputStream, value: Array[Int]): Unit = {
+    writeType(out, "integer")
+    out.writeInt(value.length)
+    value.foreach(v => out.writeInt(v))
+  }
+
+  def writeDoubleArr(out: DataOutputStream, value: Array[Double]): Unit = {
+    writeType(out, "double")
+    out.writeInt(value.length)
+    value.foreach(v => out.writeDouble(v))
+  }
+
+  def writeBooleanArr(out: DataOutputStream, value: Array[Boolean]): Unit = {
+    writeType(out, "logical")
+    out.writeInt(value.length)
+    value.foreach(v => writeBoolean(out, v))
+  }
+
+  def writeStringArr(out: DataOutputStream, value: Array[String]): Unit = {
+    writeType(out, "character")
+    out.writeInt(value.length)
+    value.foreach(v => writeString(out, v))
+  }
+
+  def writeBytesArr(out: DataOutputStream, value: Array[Array[Byte]]): Unit = {
+    writeType(out, "raw")
+    out.writeInt(value.length)
+    value.foreach(v => writeBytes(out, v))
+  }
+}
+
+private[r] object SerializationFormats {
+  val BYTE = "byte"
+  val STRING = "string"
+  val ROW = "row"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
new file mode 100644
index 0000000..e99779f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io._
+import java.util.concurrent.{Semaphore, TimeUnit}
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.api.r.RBackend
+import org.apache.spark.util.RedirectThread
+
+/**
+ * Main class used to launch SparkR applications using spark-submit. It 
executes R as a
+ * subprocess and then has it connect back to the JVM to access system 
properties etc.
+ */
+object RRunner {
+  def main(args: Array[String]): Unit = {
+    val rFile = PythonRunner.formatPath(args(0))
+
+    val otherArgs = args.slice(1, args.length)
+
+    // Time to wait for SparkR backend to initialize in seconds
+    val backendTimeout = sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", 
"120").toInt
+    val rCommand = "Rscript"
+
+    // Check if the file path exists.
+    // If not, change directory to current working directory for YARN cluster 
mode
+    val rF = new File(rFile)
+    val rFileNormalized = if (!rF.exists()) {
+      new Path(rFile).getName
+    } else {
+      rFile
+    }
+
+    // Launch a SparkR backend server for the R process to connect to; this 
will let it see our
+    // Java system properties etc.
+    val sparkRBackend = new RBackend()
+    @volatile var sparkRBackendPort = 0
+    val initialized = new Semaphore(0)
+    val sparkRBackendThread = new Thread("SparkR backend") {
+      override def run() {
+        sparkRBackendPort = sparkRBackend.init()
+        initialized.release()
+        sparkRBackend.run()
+      }
+    }
+
+    sparkRBackendThread.start()
+    // Wait for RBackend initialization to finish
+    if (initialized.tryAcquire(backendTimeout, TimeUnit.SECONDS)) {
+      // Launch R
+      val returnCode = try {
+        val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ 
otherArgs)
+        val env = builder.environment()
+        env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString)
+        val sparkHome = System.getenv("SPARK_HOME")
+        env.put("R_PROFILE_USER",
+          Seq(sparkHome, "R", "lib", "SparkR", "profile", 
"general.R").mkString(File.separator))
+        builder.redirectErrorStream(true) // Ugly but needed for stdout and 
stderr to synchronize
+        val process = builder.start()
+
+        new RedirectThread(process.getInputStream, System.out, "redirect R 
output").start()
+
+        process.waitFor()
+      } finally {
+        sparkRBackend.close()
+      }
+      System.exit(returnCode)
+    } else {
+      System.err.println("SparkR backend did not initialize in " + 
backendTimeout + " seconds")
+      System.exit(-1)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 660307d..60bc243 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -77,6 +77,7 @@ object SparkSubmit {
   // Special primary resource names that represent shells rather than 
application jars.
   private val SPARK_SHELL = "spark-shell"
   private val PYSPARK_SHELL = "pyspark-shell"
+  private val SPARKR_SHELL = "sparkr-shell"
 
   private val CLASS_NOT_FOUND_EXIT_STATUS = 101
 
@@ -284,6 +285,13 @@ object SparkSubmit {
       }
     }
 
+    // Require all R files to be local
+    if (args.isR && !isYarnCluster) {
+      if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
+        printErrorAndExit(s"Only local R files are supported: 
$args.primaryResource")
+      }
+    }
+
     // The following modes are not supported or applicable
     (clusterManager, deployMode) match {
       case (MESOS, CLUSTER) =>
@@ -291,6 +299,9 @@ object SparkSubmit {
       case (STANDALONE, CLUSTER) if args.isPython =>
         printErrorAndExit("Cluster deploy mode is currently not supported for 
python " +
           "applications on standalone clusters.")
+      case (STANDALONE, CLUSTER) if args.isR =>
+        printErrorAndExit("Cluster deploy mode is currently not supported for 
R " +
+          "applications on standalone clusters.")
       case (_, CLUSTER) if isShell(args.primaryResource) =>
         printErrorAndExit("Cluster deploy mode is not applicable to Spark 
shells.")
       case (_, CLUSTER) if isSqlShell(args.mainClass) =>
@@ -317,11 +328,32 @@ object SparkSubmit {
       }
     }
 
-    // In yarn-cluster mode for a python app, add primary resource and pyFiles 
to files
-    // that can be distributed with the job
-    if (args.isPython && isYarnCluster) {
-      args.files = mergeFileLists(args.files, args.primaryResource)
-      args.files = mergeFileLists(args.files, args.pyFiles)
+    // If we're running a R app, set the main class to our specific R runner
+    if (args.isR && deployMode == CLIENT) {
+      if (args.primaryResource == SPARKR_SHELL) {
+        args.mainClass = "org.apache.spark.api.r.RBackend"
+      } else {
+        // If a R file is provided, add it to the child arguments and list of 
files to deploy.
+        // Usage: RRunner <main R file> [app arguments]
+        args.mainClass = "org.apache.spark.deploy.RRunner"
+        args.childArgs = ArrayBuffer(args.primaryResource) ++ args.childArgs
+        args.files = mergeFileLists(args.files, args.primaryResource)
+      }
+    }
+
+    if (isYarnCluster) {
+      // In yarn-cluster mode for a python app, add primary resource and 
pyFiles to files
+      // that can be distributed with the job
+      if (args.isPython) {
+        args.files = mergeFileLists(args.files, args.primaryResource)
+        args.files = mergeFileLists(args.files, args.pyFiles)
+      }
+
+      // In yarn-cluster mode for a R app, add primary resource to files
+      // that can be distributed with the job
+      if (args.isR) {
+        args.files = mergeFileLists(args.files, args.primaryResource)
+      }
     }
 
     // Special flag to avoid deprecation warnings at the client
@@ -405,8 +437,8 @@ object SparkSubmit {
 
     // Add the application jar automatically so the user doesn't have to call 
sc.addJar
     // For YARN cluster mode, the jar is already distributed on each node as 
"app.jar"
-    // For python files, the primary resource is already distributed as a 
regular file
-    if (!isYarnCluster && !args.isPython) {
+    // For python and R files, the primary resource is already distributed as 
a regular file
+    if (!isYarnCluster && !args.isPython && !args.isR) {
       var jars = sysProps.get("spark.jars").map(x => 
x.split(",").toSeq).getOrElse(Seq.empty)
       if (isUserJar(args.primaryResource)) {
         jars = jars ++ Seq(args.primaryResource)
@@ -447,6 +479,10 @@ object SparkSubmit {
           childArgs += ("--py-files", pyFilesNames)
         }
         childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
+      } else if (args.isR) {
+        val mainFile = new Path(args.primaryResource).getName
+        childArgs += ("--primary-r-file", mainFile)
+        childArgs += ("--class", "org.apache.spark.deploy.RRunner")
       } else {
         if (args.primaryResource != SPARK_INTERNAL) {
           childArgs += ("--jar", args.primaryResource)
@@ -591,15 +627,15 @@ object SparkSubmit {
   /**
    * Return whether the given primary resource represents a user jar.
    */
-  private def isUserJar(primaryResource: String): Boolean = {
-    !isShell(primaryResource) && !isPython(primaryResource) && 
!isInternal(primaryResource)
+  private[deploy] def isUserJar(res: String): Boolean = {
+    !isShell(res) && !isPython(res) && !isInternal(res) && !isR(res)
   }
 
   /**
    * Return whether the given primary resource represents a shell.
    */
-  private[deploy] def isShell(primaryResource: String): Boolean = {
-    primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL
+  private[deploy] def isShell(res: String): Boolean = {
+    (res == SPARK_SHELL || res == PYSPARK_SHELL || res == SPARKR_SHELL)
   }
 
   /**
@@ -619,12 +655,19 @@ object SparkSubmit {
   /**
    * Return whether the given primary resource requires running python.
    */
-  private[deploy] def isPython(primaryResource: String): Boolean = {
-    primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
+  private[deploy] def isPython(res: String): Boolean = {
+    res != null && res.endsWith(".py") || res == PYSPARK_SHELL
+  }
+
+  /**
+   * Return whether the given primary resource requires running R.
+   */
+  private[deploy] def isR(res: String): Boolean = {
+    res != null && res.endsWith(".R") || res == SPARKR_SHELL
   }
 
-  private[deploy] def isInternal(primaryResource: String): Boolean = {
-    primaryResource == SPARK_INTERNAL
+  private[deploy] def isInternal(res: String): Boolean = {
+    res == SPARK_INTERNAL
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 6eb73c4..03ecf3f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -59,6 +59,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], 
env: Map[String, S
   var verbose: Boolean = false
   var isPython: Boolean = false
   var pyFiles: String = null
+  var isR: Boolean = false
   var action: SparkSubmitAction = null
   val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
   var proxyUser: String = null
@@ -158,7 +159,7 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
       .getOrElse(sparkProperties.get("spark.executor.instances").orNull)
 
     // Try to set main class from JAR if no --class argument is given
-    if (mainClass == null && !isPython && primaryResource != null) {
+    if (mainClass == null && !isPython && !isR && primaryResource != null) {
       val uri = new URI(primaryResource)
       val uriScheme = uri.getScheme()
 
@@ -211,9 +212,9 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
       printUsageAndExit(-1)
     }
     if (primaryResource == null) {
-      SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or 
Python file)")
+      SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or 
Python or R file)")
     }
-    if (mainClass == null && !isPython) {
+    if (mainClass == null && SparkSubmit.isUserJar(primaryResource)) {
       SparkSubmit.printErrorAndExit("No main class set in JAR; please specify 
one with --class")
     }
     if (pyFiles != null && !isPython) {
@@ -414,6 +415,7 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
         opt
       }
     isPython = SparkSubmit.isPython(opt)
+    isR = SparkSubmit.isR(opt)
     false
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/dev/run-tests
----------------------------------------------------------------------
diff --git a/dev/run-tests b/dev/run-tests
index 561d7fc..1b6cf78 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -236,3 +236,18 @@ echo 
"========================================================================="
 CURRENT_BLOCK=$BLOCK_PYSPARK_UNIT_TESTS
 
 ./python/run-tests
+
+echo ""
+echo 
"========================================================================="
+echo "Running SparkR tests"
+echo 
"========================================================================="
+
+CURRENT_BLOCK=$BLOCK_SPARKR_UNIT_TESTS
+
+if [ $(command -v R) ]; then
+  ./R/install-dev.sh
+  ./R/run-tests.sh
+else
+  echo "Ignoring SparkR tests as R was not found in PATH"
+fi
+

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/dev/run-tests-codes.sh
----------------------------------------------------------------------
diff --git a/dev/run-tests-codes.sh b/dev/run-tests-codes.sh
index 8ab6db6..154e012 100644
--- a/dev/run-tests-codes.sh
+++ b/dev/run-tests-codes.sh
@@ -25,3 +25,4 @@ readonly BLOCK_BUILD=14
 readonly BLOCK_MIMA=15
 readonly BLOCK_SPARK_UNIT_TESTS=16
 readonly BLOCK_PYSPARK_UNIT_TESTS=17
+readonly BLOCK_SPARKR_UNIT_TESTS=18

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/dev/run-tests-jenkins
----------------------------------------------------------------------
diff --git a/dev/run-tests-jenkins b/dev/run-tests-jenkins
index f10aa6b..f637283 100755
--- a/dev/run-tests-jenkins
+++ b/dev/run-tests-jenkins
@@ -210,6 +210,8 @@ done
       failing_test="Spark unit tests"
     elif [ "$test_result" -eq "$BLOCK_PYSPARK_UNIT_TESTS" ]; then
       failing_test="PySpark unit tests"
+    elif [ "$test_result" -eq "$BLOCK_SPARKR_UNIT_TESTS" ]; then
+      failing_test="SparkR unit tests"
     else
       failing_test="some tests"
     fi

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/docs/README.md
----------------------------------------------------------------------
diff --git a/docs/README.md b/docs/README.md
index 3773ea2..5852f97 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -58,13 +58,19 @@ phase, use the following sytax:
 We use Sphinx to generate Python API docs, so you will need to install it by 
running
 `sudo pip install sphinx`.
 
-## API Docs (Scaladoc and Sphinx)
+## knitr, devtools
+
+SparkR documentation is written using `roxygen2` and we use `knitr`, 
`devtools` to generate
+documentation. To install these packages you can run 
`install.packages(c("knitr", "devtools"))` from a
+R console.
+
+## API Docs (Scaladoc, Sphinx, roxygen2)
 
 You can build just the Spark scaladoc by running `build/sbt unidoc` from the 
SPARK_PROJECT_ROOT directory.
 
 Similarly, you can build just the PySpark docs by running `make html` from the
 SPARK_PROJECT_ROOT/python/docs directory. Documentation is only generated for 
classes that are listed as
-public in `__init__.py`.
+public in `__init__.py`. The SparkR docs can be built by running 
SPARK_PROJECT_ROOT/R/create-docs.sh.
 
 When you run `jekyll` in the `docs` directory, it will also copy over the 
scaladoc for the various
 Spark subprojects into the `docs` directory (and then also into the `_site` 
directory). We use a
@@ -72,5 +78,5 @@ jekyll plugin to run `build/sbt unidoc` before building the 
site so if you haven
 may take some time as it generates all of the scaladoc.  The jekyll plugin 
also generates the
 PySpark docs [Sphinx](http://sphinx-doc.org/).
 
-NOTE: To skip the step of building and copying over the Scala and Python API 
docs, run `SKIP_API=1
+NOTE: To skip the step of building and copying over the Scala, Python, R API 
docs, run `SKIP_API=1
 jekyll`.

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/docs/_layouts/global.html
----------------------------------------------------------------------
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index 2e88b30..b92c75f 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -84,6 +84,7 @@
                                 <li><a 
href="api/scala/index.html#org.apache.spark.package">Scala</a></li>
                                 <li><a href="api/java/index.html">Java</a></li>
                                 <li><a 
href="api/python/index.html">Python</a></li>
+                                <li><a href="api/R/index.html">R</a></li>
                             </ul>
                         </li>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/docs/_plugins/copy_api_dirs.rb
----------------------------------------------------------------------
diff --git a/docs/_plugins/copy_api_dirs.rb b/docs/_plugins/copy_api_dirs.rb
index 3c626a0..0ea3f8e 100644
--- a/docs/_plugins/copy_api_dirs.rb
+++ b/docs/_plugins/copy_api_dirs.rb
@@ -78,5 +78,18 @@ if not (ENV['SKIP_API'] == '1' or ENV['SKIP_SCALADOC'] == 
'1')
   puts "cp -r python/docs/_build/html/. docs/api/python"
   cp_r("python/docs/_build/html/.", "docs/api/python")
 
-  cd("..")
+  # Build SparkR API docs
+  puts "Moving to R directory and building roxygen docs."
+  cd("R")
+  puts `./create-docs.sh`
+
+  puts "Moving back into home dir."
+  cd("../")
+
+  puts "Making directory api/R"
+  mkdir_p "docs/api/R"
+
+  puts "cp -r R/pkg/html/. docs/api/R"
+  cp_r("R/pkg/html/.", "docs/api/R")
+
 end

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/examples/src/main/r/kmeans.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/kmeans.R b/examples/src/main/r/kmeans.R
new file mode 100644
index 0000000..6e6b5cb
--- /dev/null
+++ b/examples/src/main/r/kmeans.R
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(SparkR)
+
+# Logistic regression in Spark.
+# Note: unlike the example in Scala, a point here is represented as a vector of
+# doubles.
+
+parseVectors <-  function(lines) {
+  lines <- strsplit(as.character(lines) , " ", fixed = TRUE)
+  list(matrix(as.numeric(unlist(lines)), ncol = length(lines[[1]])))
+}
+
+dist.fun <- function(P, C) {
+  apply(
+    C,
+    1, 
+    function(x) { 
+      colSums((t(P) - x)^2)
+    }
+  )
+}
+
+closestPoint <-  function(P, C) {
+  max.col(-dist.fun(P, C))
+}
+# Main program
+
+args <- commandArgs(trailing = TRUE) 
+
+if (length(args) != 3) {
+  print("Usage: kmeans <file> <K> <convergeDist>")
+  q("no")
+}
+
+sc <- sparkR.init(appName = "RKMeans")
+K <- as.integer(args[[2]])
+convergeDist <- as.double(args[[3]])
+
+lines <- textFile(sc, args[[1]])
+points <- cache(lapplyPartition(lines, parseVectors))
+# kPoints <- take(points, K)
+kPoints <- do.call(rbind, takeSample(points, FALSE, K, 16189L))
+tempDist <- 1.0
+
+while (tempDist > convergeDist) {
+  closest <- lapplyPartition(
+    lapply(points,
+           function(p) {
+             cp <- closestPoint(p, kPoints); 
+             mapply(list, unique(cp), split.data.frame(cbind(1, p), cp), 
SIMPLIFY=FALSE)
+           }),
+    function(x) {do.call(c, x)
+    })
+  
+  pointStats <- reduceByKey(closest,
+                            function(p1, p2) {
+                              t(colSums(rbind(p1, p2)))
+                            },
+                            2L)
+  
+  newPoints <- do.call(
+    rbind,
+    collect(lapply(pointStats,
+                   function(tup) {
+                     point.sum <- tup[[2]][, -1]
+                     point.count <- tup[[2]][, 1]
+                     point.sum/point.count
+                   })))
+  
+  D <- dist.fun(kPoints, newPoints)
+  tempDist <- sum(D[cbind(1:3, max.col(-D))])
+  kPoints <- newPoints
+  cat("Finished iteration (delta = ", tempDist, ")\n")
+}
+
+cat("Final centers:\n")
+writeLines(unlist(lapply(kPoints, paste, collapse = " ")))

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/examples/src/main/r/linear_solver_mnist.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/linear_solver_mnist.R 
b/examples/src/main/r/linear_solver_mnist.R
new file mode 100644
index 0000000..c864a42
--- /dev/null
+++ b/examples/src/main/r/linear_solver_mnist.R
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Instructions: 
https://github.com/amplab-extras/SparkR-pkg/wiki/SparkR-Example:-Digit-Recognition-on-EC2
+
+library(SparkR)
+library(Matrix)
+
+args <- commandArgs(trailing = TRUE)
+
+# number of random features; default to 1100
+D <- ifelse(length(args) > 0, as.integer(args[[1]]), 1100)
+# number of partitions for training dataset
+trainParts <- 12
+# dimension of digits
+d <- 784
+# number of test examples
+NTrain <- 60000
+# number of training examples
+NTest <- 10000
+# scale of features
+gamma <- 4e-4
+
+sc <- sparkR.init(appName = "SparkR-LinearSolver")
+
+# You can also use HDFS path to speed things up:
+# hdfs://<master>/train-mnist-dense-with-labels.data
+file <- textFile(sc, "/data/train-mnist-dense-with-labels.data", trainParts)
+
+W <- gamma * matrix(nrow=D, ncol=d, data=rnorm(D*d))
+b <- 2 * pi * matrix(nrow=D, ncol=1, data=runif(D))
+broadcastW <- broadcast(sc, W)
+broadcastB <- broadcast(sc, b)
+
+includePackage(sc, Matrix)
+numericLines <- lapplyPartitionsWithIndex(file,
+                       function(split, part) {
+                         matList <- sapply(part, function(line) {
+                           as.numeric(strsplit(line, ",", fixed=TRUE)[[1]])
+                         }, simplify=FALSE)
+                         mat <- Matrix(ncol=d+1, data=unlist(matList, F, F),
+                                       sparse=T, byrow=T)
+                         mat
+                       })
+
+featureLabels <- cache(lapplyPartition(
+    numericLines,
+    function(part) {
+      label <- part[,1]
+      mat <- part[,-1]
+      ones <- rep(1, nrow(mat))
+      features <- cos(
+        mat %*% t(value(broadcastW)) + (matrix(ncol=1, data=ones) %*% 
t(value(broadcastB))))
+      onesMat <- Matrix(ones)
+      featuresPlus <- cBind(features, onesMat)
+      labels <- matrix(nrow=nrow(mat), ncol=10, data=-1)
+      for (i in 1:nrow(mat)) {
+        labels[i, label[i]] <- 1
+      }
+      list(label=labels, features=featuresPlus)
+  }))
+
+FTF <- Reduce("+", collect(lapplyPartition(featureLabels,
+    function(part) {
+      t(part$features) %*% part$features
+    }), flatten=F))
+
+FTY <- Reduce("+", collect(lapplyPartition(featureLabels,
+    function(part) {
+      t(part$features) %*% part$label
+    }), flatten=F))
+
+# solve for the coefficient matrix
+C <- solve(FTF, FTY)
+
+test <- Matrix(as.matrix(read.csv("/data/test-mnist-dense-with-labels.data",
+                         header=F), sparse=T))
+testData <- test[,-1]
+testLabels <- matrix(ncol=1, test[,1])
+
+err <- 0
+
+# contstruct the feature maps for all examples from this digit
+featuresTest <- cos(testData %*% t(value(broadcastW)) +
+    (matrix(ncol=1, data=rep(1, NTest)) %*% t(value(broadcastB))))
+featuresTest <- cBind(featuresTest, Matrix(rep(1, NTest)))
+
+# extract the one vs. all assignment
+results <- featuresTest %*% C
+labelsGot <- apply(results, 1, which.max)
+err <- sum(testLabels != labelsGot) / nrow(testLabels)
+
+cat("\nFinished running. The error rate is: ", err, ".\n")

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/examples/src/main/r/logistic_regression.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/logistic_regression.R 
b/examples/src/main/r/logistic_regression.R
new file mode 100644
index 0000000..2a86aa9
--- /dev/null
+++ b/examples/src/main/r/logistic_regression.R
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(SparkR)
+
+args <- commandArgs(trailing = TRUE)
+
+if (length(args) != 3) {
+  print("Usage: logistic_regression <file> <iters> <dimension>")
+  q("no")
+}
+
+# Initialize Spark context
+sc <- sparkR.init(appName = "LogisticRegressionR")
+iterations <- as.integer(args[[2]])
+D <- as.integer(args[[3]])
+
+readPartition <- function(part){
+  part = strsplit(part, " ", fixed = T)
+  list(matrix(as.numeric(unlist(part)), ncol = length(part[[1]])))
+}
+
+# Read data points and convert each partition to a matrix
+points <- cache(lapplyPartition(textFile(sc, args[[1]]), readPartition))
+
+# Initialize w to a random value
+w <- runif(n=D, min = -1, max = 1)
+cat("Initial w: ", w, "\n")
+
+# Compute logistic regression gradient for a matrix of data points
+gradient <- function(partition) {
+  partition = partition[[1]]
+  Y <- partition[, 1]  # point labels (first column of input file)
+  X <- partition[, -1] # point coordinates
+
+  # For each point (x, y), compute gradient function
+  dot <- X %*% w
+  logit <- 1 / (1 + exp(-Y * dot))
+  grad <- t(X) %*% ((logit - 1) * Y)
+  list(grad)
+}
+
+for (i in 1:iterations) {
+  cat("On iteration ", i, "\n")
+  w <- w - reduce(lapplyPartition(points, gradient), "+")
+}
+
+cat("Final w: ", w, "\n")

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/examples/src/main/r/pi.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/pi.R b/examples/src/main/r/pi.R
new file mode 100644
index 0000000..aa7a833
--- /dev/null
+++ b/examples/src/main/r/pi.R
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(SparkR)
+
+args <- commandArgs(trailing = TRUE)
+
+sc <- sparkR.init(appName = "PiR")
+
+slices <- ifelse(length(args) > 1, as.integer(args[[2]]), 2)
+
+n <- 100000 * slices
+
+piFunc <- function(elem) {
+  rands <- runif(n = 2, min = -1, max = 1)
+  val <- ifelse((rands[1]^2 + rands[2]^2) < 1, 1.0, 0.0)
+  val
+}
+
+
+piFuncVec <- function(elems) {
+  message(length(elems))
+  rands1 <- runif(n = length(elems), min = -1, max = 1)
+  rands2 <- runif(n = length(elems), min = -1, max = 1)
+  val <- ifelse((rands1^2 + rands2^2) < 1, 1.0, 0.0)
+  sum(val)
+}
+
+rdd <- parallelize(sc, 1:n, slices)
+count <- reduce(lapplyPartition(rdd, piFuncVec), sum)
+cat("Pi is roughly", 4.0 * count / n, "\n")
+cat("Num elements in RDD ", count(rdd), "\n")

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/examples/src/main/r/wordcount.R
----------------------------------------------------------------------
diff --git a/examples/src/main/r/wordcount.R b/examples/src/main/r/wordcount.R
new file mode 100644
index 0000000..b734cb0
--- /dev/null
+++ b/examples/src/main/r/wordcount.R
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(SparkR)
+
+args <- commandArgs(trailing = TRUE)
+
+if (length(args) != 1) {
+  print("Usage: wordcount <file>")
+  q("no")
+}
+
+# Initialize Spark context
+sc <- sparkR.init(appName = "RwordCount")
+lines <- textFile(sc, args[[1]])
+
+words <- flatMap(lines,
+                 function(line) {
+                   strsplit(line, " ")[[1]]
+                 })
+wordCount <- lapply(words, function(word) { list(word, 1L) })
+
+counts <- reduceByKey(wordCount, "+", 2L)
+output <- collect(counts)
+
+for (wordcount in output) {
+  cat(wordcount[[1]], ": ", wordcount[[2]], "\n")
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java 
b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
index 9b04732..f4ebc25 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
@@ -274,14 +274,14 @@ class CommandBuilderUtils {
   }
 
   /**
-   * Quotes a string so that it can be used in a command string and be parsed 
back into a single
-   * argument by python's "shlex.split()" function.
-   *
+   * Quotes a string so that it can be used in a command string.
    * Basically, just add simple escapes. E.g.:
    *    original single argument : ab "cd" ef
    *    after: "ab \"cd\" ef"
+   *
+   * This can be parsed back into a single argument by python's 
"shlex.split()" function.
    */
-  static String quoteForPython(String s) {
+  static String quoteForCommandString(String s) {
     StringBuilder quoted = new StringBuilder().append('"');
     for (int i = 0; i < s.length(); i++) {
       int cp = s.codePointAt(i);

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
 
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index 91dcf70..a73c9c8 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -17,14 +17,9 @@
 
 package org.apache.spark.launcher;
 
+import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 import static org.apache.spark.launcher.CommandBuilderUtils.*;
 
@@ -54,6 +49,20 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
   static final String PYSPARK_SHELL_RESOURCE = "pyspark-shell";
 
   /**
+   * Name of the app resource used to identify the SparkR shell. The command 
line parser expects
+   * the resource name to be the very first argument to spark-submit in this 
case.
+   *
+   * NOTE: this cannot be "sparkr-shell" since that identifies the SparkR 
shell to SparkSubmit
+   * (see sparkR.R), and can cause this code to enter into an infinite loop.
+   */
+  static final String SPARKR_SHELL = "sparkr-shell-main";
+
+  /**
+   * This is the actual resource name that identifies the SparkR shell to 
SparkSubmit.
+   */
+  static final String SPARKR_SHELL_RESOURCE = "sparkr-shell";
+
+  /**
    * This map must match the class names for available special classes, since 
this modifies the way
    * command line parsing works. This maps the class name to the resource to 
use when calling
    * spark-submit.
@@ -87,6 +96,10 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
       this.allowsMixedArguments = true;
       appResource = PYSPARK_SHELL_RESOURCE;
       submitArgs = args.subList(1, args.size());
+    } else if (args.size() > 0 && args.get(0).equals(SPARKR_SHELL)) {
+      this.allowsMixedArguments = true;
+      appResource = SPARKR_SHELL_RESOURCE;
+      submitArgs = args.subList(1, args.size());
     } else {
       this.allowsMixedArguments = false;
     }
@@ -98,6 +111,8 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
   public List<String> buildCommand(Map<String, String> env) throws IOException 
{
     if (PYSPARK_SHELL_RESOURCE.equals(appResource)) {
       return buildPySparkShellCommand(env);
+    } else if (SPARKR_SHELL_RESOURCE.equals(appResource)) {
+      return buildSparkRCommand(env);
     } else {
       return buildSparkSubmitCommand(env);
     }
@@ -213,36 +228,62 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
       return buildCommand(env);
     }
 
-    // When launching the pyspark shell, the spark-submit arguments should be 
stored in the
-    // PYSPARK_SUBMIT_ARGS env variable. The executable is the 
PYSPARK_DRIVER_PYTHON env variable
-    // set by the pyspark script, followed by PYSPARK_DRIVER_PYTHON_OPTS.
     checkArgument(appArgs.isEmpty(), "pyspark does not support any application 
options.");
 
+    // When launching the pyspark shell, the spark-submit arguments should be 
stored in the
+    // PYSPARK_SUBMIT_ARGS env variable.
+    constructEnvVarArgs(env, "PYSPARK_SUBMIT_ARGS");
+
+    // The executable is the PYSPARK_DRIVER_PYTHON env variable set by the 
pyspark script,
+    // followed by PYSPARK_DRIVER_PYTHON_OPTS.
+    List<String> pyargs = new ArrayList<String>();
+    pyargs.add(firstNonEmpty(System.getenv("PYSPARK_DRIVER_PYTHON"), 
"python"));
+    String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
+    if (!isEmpty(pyOpts)) {
+      pyargs.addAll(parseOptionString(pyOpts));
+    }
+
+    return pyargs;
+  }
+
+  private List<String> buildSparkRCommand(Map<String, String> env) throws 
IOException {
+    if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".R")) {
+      appResource = appArgs.get(0);
+      appArgs.remove(0);
+      return buildCommand(env);
+    }
+    // When launching the SparkR shell, store the spark-submit arguments in 
the SPARKR_SUBMIT_ARGS
+    // env variable.
+    constructEnvVarArgs(env, "SPARKR_SUBMIT_ARGS");
+
+    // Set shell.R as R_PROFILE_USER to load the SparkR package when the shell 
comes up.
+    String sparkHome = System.getenv("SPARK_HOME");
+    env.put("R_PROFILE_USER",
+            join(File.separator, sparkHome, "R", "lib", "SparkR", "profile", 
"shell.R"));
+
+    List<String> args = new ArrayList<String>();
+    args.add(firstNonEmpty(System.getenv("SPARKR_DRIVER_R"), "R"));
+    return args;
+  }
+
+  private void constructEnvVarArgs(
+      Map<String, String> env,
+      String submitArgsEnvVariable) throws IOException {
     Properties props = loadPropertiesFile();
     mergeEnvPathList(env, getLibPathEnvName(),
       firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, conf, 
props));
 
-    // Store spark-submit arguments in an environment variable, since there's 
no way to pass
-    // them to shell.py on the comand line.
     StringBuilder submitArgs = new StringBuilder();
     for (String arg : buildSparkSubmitArgs()) {
       if (submitArgs.length() > 0) {
         submitArgs.append(" ");
       }
-      submitArgs.append(quoteForPython(arg));
+      submitArgs.append(quoteForCommandString(arg));
     }
-    env.put("PYSPARK_SUBMIT_ARGS", submitArgs.toString());
-
-    List<String> pyargs = new ArrayList<String>();
-    pyargs.add(firstNonEmpty(System.getenv("PYSPARK_DRIVER_PYTHON"), 
"python"));
-    String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
-    if (!isEmpty(pyOpts)) {
-      pyargs.addAll(parseOptionString(pyOpts));
-    }
-
-    return pyargs;
+    env.put(submitArgsEnvVariable, submitArgs.toString());
   }
 
+
   private boolean isClientMode(Properties userProps) {
     String userMaster = firstNonEmpty(master, (String) 
userProps.get(SparkLauncher.SPARK_MASTER));
     // Default master is "local[*]", so assume client mode in that case.

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
 
b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
index dba0203..1ae42ee 100644
--- 
a/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
+++ 
b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
@@ -79,9 +79,9 @@ public class CommandBuilderUtilsSuite {
 
   @Test
   public void testPythonArgQuoting() {
-    assertEquals("\"abc\"", quoteForPython("abc"));
-    assertEquals("\"a b c\"", quoteForPython("a b c"));
-    assertEquals("\"a \\\"b\\\" c\"", quoteForPython("a \"b\" c"));
+    assertEquals("\"abc\"", quoteForCommandString("abc"));
+    assertEquals("\"a b c\"", quoteForCommandString("a b c"));
+    assertEquals("\"a \\\"b\\\" c\"", quoteForCommandString("a \"b\" c"));
   }
 
   private void testOpt(String opts, List<String> expected) {

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 42bd926..70e297c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1749,5 +1749,8 @@
     <profile>
       <id>parquet-provided</id>
     </profile>
+    <profile>
+      <id>sparkr</id>
+    </profile>
   </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index a5e6b63..53ad673 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.types.NumericType
 @Experimental
 class GroupedData protected[sql](df: DataFrame, groupingExprs: 
Seq[Expression]) {
 
-  private[this] implicit def toDF(aggExprs: Seq[NamedExpression]): DataFrame = 
{
+  private[sql] implicit def toDF(aggExprs: Seq[NamedExpression]): DataFrame = {
     val namedGroupingExprs = groupingExprs.map {
       case expr: NamedExpression => expr
       case expr: Expression => Alias(expr, expr.prettyString)()

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
new file mode 100644
index 0000000..d1ea7cc
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.r
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, 
DataOutputStream}
+
+import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.api.r.SerDe
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, 
NamedExpression}
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.{Column, DataFrame, GroupedData, Row, SQLContext, 
SaveMode}
+
+private[r] object SQLUtils {
+  def createSQLContext(jsc: JavaSparkContext): SQLContext = {
+    new SQLContext(jsc)
+  }
+
+  def getJavaSparkContext(sqlCtx: SQLContext): JavaSparkContext = {
+    new JavaSparkContext(sqlCtx.sparkContext)
+  }
+
+  def toSeq[T](arr: Array[T]): Seq[T] = {
+    arr.toSeq
+  }
+
+  def createDF(rdd: RDD[Array[Byte]], schemaString: String, sqlContext: 
SQLContext): DataFrame = {
+    val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
+    val num = schema.fields.size
+    val rowRDD = rdd.map(bytesToRow)
+    sqlContext.createDataFrame(rowRDD, schema)
+  }
+
+  // A helper to include grouping columns in Agg()
+  def aggWithGrouping(gd: GroupedData, exprs: Column*): DataFrame = {
+    val aggExprs = exprs.map { col =>
+      col.expr match {
+        case expr: NamedExpression => expr
+        case expr: Expression => Alias(expr, expr.simpleString)()
+      }
+    }
+    gd.toDF(aggExprs)
+  }
+
+  def dfToRowRDD(df: DataFrame): JavaRDD[Array[Byte]] = {
+    df.map(r => rowToRBytes(r))
+  }
+
+  private[this] def bytesToRow(bytes: Array[Byte]): Row = {
+    val bis = new ByteArrayInputStream(bytes)
+    val dis = new DataInputStream(bis)
+    val num = SerDe.readInt(dis)
+    Row.fromSeq((0 until num).map { i =>
+      SerDe.readObject(dis)
+    }.toSeq)
+  }
+
+  private[this] def rowToRBytes(row: Row): Array[Byte] = {
+    val bos = new ByteArrayOutputStream()
+    val dos = new DataOutputStream(bos)
+
+    SerDe.writeInt(dos, row.length)
+    (0 until row.length).map { idx =>
+      val obj: Object = row(idx).asInstanceOf[Object]
+      SerDe.writeObject(dos, obj)
+    }
+    bos.toByteArray()
+  }
+
+  def dfToCols(df: DataFrame): Array[Array[Byte]] = {
+    // localDF is Array[Row]
+    val localDF = df.collect()
+    val numCols = df.columns.length
+    // dfCols is Array[Array[Any]]
+    val dfCols = convertRowsToColumns(localDF, numCols)
+
+    dfCols.map { col =>
+      colToRBytes(col)
+    } 
+  }
+
+  def convertRowsToColumns(localDF: Array[Row], numCols: Int): 
Array[Array[Any]] = {
+    (0 until numCols).map { colIdx =>
+      localDF.map { row =>
+        row(colIdx)
+      }
+    }.toArray
+  }
+
+  def colToRBytes(col: Array[Any]): Array[Byte] = {
+    val numRows = col.length
+    val bos = new ByteArrayOutputStream()
+    val dos = new DataOutputStream(bos)
+    
+    SerDe.writeInt(dos, numRows)
+
+    col.map { item =>
+      val obj: Object = item.asInstanceOf[Object]
+      SerDe.writeObject(dos, obj)
+    }
+    bos.toByteArray()
+  }
+
+  def saveMode(mode: String): SaveMode = {
+    mode match {
+      case "append" => SaveMode.Append
+      case "overwrite" => SaveMode.Overwrite
+      case "error" => SaveMode.ErrorIfExists
+      case "ignore" => SaveMode.Ignore
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 24a1e02..32bc4e5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -469,6 +469,9 @@ private[spark] class ApplicationMaster(
       System.setProperty("spark.submit.pyFiles",
         PythonRunner.formatPaths(args.pyFiles).mkString(","))
     }
+    if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
+      // TODO(davies): add R dependencies here
+    }
     val mainMethod = userClassLoader.loadClass(args.userClass)
       .getMethod("main", classOf[Array[String]])
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index e1a992a..ae6dc10 100644
--- 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -25,6 +25,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
   var userJar: String = null
   var userClass: String = null
   var primaryPyFile: String = null
+  var primaryRFile: String = null
   var pyFiles: String = null
   var userArgs: Seq[String] = Seq[String]()
   var executorMemory = 1024
@@ -54,6 +55,10 @@ class ApplicationMasterArguments(val args: Array[String]) {
           primaryPyFile = value
           args = tail
 
+        case ("--primary-r-file") :: value :: tail =>
+          primaryRFile = value
+          args = tail
+
         case ("--py-files") :: value :: tail =>
           pyFiles = value
           args = tail
@@ -79,6 +84,11 @@ class ApplicationMasterArguments(val args: Array[String]) {
       }
     }
 
+    if (primaryPyFile != null && primaryRFile != null) {
+      System.err.println("Cannot have primary-py-file and primary-r-file at 
the same time")
+      System.exit(-1)
+    }
+
     userArgs = userArgsBuffer.readOnly
   }
 
@@ -92,6 +102,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
       |  --jar JAR_PATH       Path to your application's JAR file
       |  --class CLASS_NAME   Name of your application's main class
       |  --primary-py-file    A main Python file
+      |  --primary-r-file     A main R file
       |  --py-files PY_FILES  Comma-separated list of .zip, .egg, or .py files 
to
       |                       place on the PYTHONPATH for Python apps.
       |  --args ARGS          Arguments to be passed to your application's 
main class.

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 7219852..c1effd3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -491,6 +491,12 @@ private[spark] class Client(
       } else {
         Nil
       }
+    val primaryRFile =
+      if (args.primaryRFile != null) {
+        Seq("--primary-r-file", args.primaryRFile)
+      } else {
+        Nil
+      }
     val amClass =
       if (isClusterMode) {
         Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
@@ -500,12 +506,15 @@ private[spark] class Client(
     if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
       args.userArgs = ArrayBuffer(args.primaryPyFile, args.pyFiles) ++ 
args.userArgs
     }
+    if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
+      args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
+    }
     val userArgs = args.userArgs.flatMap { arg =>
       Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
     }
     val amArgs =
-      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ 
userArgs ++
-        Seq(
+      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ 
primaryRFile ++
+        userArgs ++ Seq(
           "--executor-memory", args.executorMemory.toString + "m",
           "--executor-cores", args.executorCores.toString,
           "--num-executors ", args.numExecutors.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 3bc7eb1..da6798c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -32,6 +32,7 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
   var userClass: String = null
   var pyFiles: String = null
   var primaryPyFile: String = null
+  var primaryRFile: String = null
   var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
   var executorMemory = 1024 // MB
   var executorCores = 1
@@ -150,6 +151,10 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
           primaryPyFile = value
           args = tail
 
+        case ("--primary-r-file") :: value :: tail =>
+          primaryRFile = value
+          args = tail
+
         case ("--args" | "--arg") :: value :: tail =>
           if (args(0) == "--args") {
             println("--args is deprecated. Use --arg instead.")
@@ -228,6 +233,11 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
           throw new IllegalArgumentException(getUsageMessage(args))
       }
     }
+
+    if (primaryPyFile != null && primaryRFile != null) {
+      throw new IllegalArgumentException("Cannot have primary-py-file and 
primary-r-file" +
+        " at the same time")
+    }
   }
 
   private def getUsageMessage(unknownParam: List[String] = null): String = {
@@ -240,6 +250,7 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
       |                           mode)
       |  --class CLASS_NAME       Name of your application's main class 
(required)
       |  --primary-py-file        A main Python file
+      |  --primary-r-file         A main R file
       |  --arg ARG                Argument to be passed to your application's 
main class.
       |                           Multiple invocations are possible, each will 
be passed in order.
       |  --num-executors NUM      Number of executors to start (Default: 2)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to