[GitHub] spark pull request #12575: [SPARK-14803][SQL][Optimizer] A bug in EliminateS...
Github user sun-rui closed the pull request at: https://github.com/apache/spark/pull/12575 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14783: SPARK-16785 R dapply doesn't return array or raw columns
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14783 @clarkfitzg, your patch is for bug fix but not for performance improvement, right? If so, since there is no performance regression according to your benchmark, let's focus on the functionality. We can address performance issue in other JIRA issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14744: [SPARK-17178][SPARKR][SPARKSUBMIT] Allow to set sparkr s...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14744 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14046: [SPARK-16366][SPARKR] Fix time comparison failure...
Github user sun-rui closed the pull request at: https://github.com/apache/spark/pull/14046 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14744: [SPARK-17178][SPARKR][SPARKSUBMIT] Allow to set sparkr s...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14744 @felixcheung, I guess that spark conf is preferred over env variable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14744: [SPARK-17178][SPARKR][SPARKSUBMIT] Allow to set sparkr s...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14744 @zjffdu, basically LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14744: [SPARK-17178][SPARKR][SPARKSUBMIT] Allow to set s...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14744#discussion_r76067417 --- Diff: docs/configuration.md --- @@ -1752,6 +1752,13 @@ showDF(properties, numRows = 200, truncate = FALSE) Executable for executing R scripts in client modes for driver. Ignored in cluster modes. + + spark.r.shell.command + R + +Executable for executing R shell. --- End diff -- maybe "Executable for executing the SparkR shell. Ignored in cluster modes.". Don't have strong opinion on the wording, just be sure to be consistent with the existing description for other 2 related options. The old descriptions can be updated together. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14775: [SPARK-16581][SPARKR] Make JVM backend calling fu...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14775#discussion_r76008164 --- Diff: R/pkg/NAMESPACE --- @@ -363,4 +363,9 @@ S3method(structField, jobj) S3method(structType, jobj) S3method(structType, structField) +export("newJObject") +export("callJMethod") +export("callJStatic") +export("cleanup.jobj") --- End diff -- @aloknsingh --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14783: SPARK-16785 R dapply doesn't return array or raw ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14783#discussion_r76007826 --- Diff: R/pkg/inst/worker/worker.R --- @@ -36,7 +36,14 @@ compute <- function(mode, partition, serializer, deserializer, key, # available since R 3.2.4. So we set the global option here. oldOpt <- getOption("stringsAsFactors") options(stringsAsFactors = FALSE) - inputData <- do.call(rbind.data.frame, inputData) + + # Handle binary data types + if ("raw" %in% sapply(inputData[[1]], class)) { +inputData <- SparkR:::rbindRaws(inputData) --- End diff -- sure. "SparkR:::" is needed for private functions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14775: [SPARK-16581][SPARKR] Make JVM backend calling fu...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14775#discussion_r76007335 --- Diff: R/pkg/NAMESPACE --- @@ -363,4 +363,9 @@ S3method(structField, jobj) S3method(structType, jobj) S3method(structType, structField) +export("newJObject") +export("callJMethod") +export("callJStatic") +export("cleanup.jobj") --- End diff -- I am a little confused about the discussion at https://issues.apache.org/jira/browse/SPARK-16611: d) cleanup.jobj: SystemML uses the MLContext and matrixCharacteristic class that is instantiated in JVM and whose object ref is kept alive in the sparkR and later when systemML has done itâs computation. we cleanup the objects. The way we achieve it using the References classes in R and use itâs finalize method to register the cleanup.jobj once we have created the jobj via newJObject(âsysml.classâ) It seems that systemML still uses automatic GC to cleanup jobjs as it is said that "use itâs finalize method to register the cleanup.jobj ". But a jobj already has cleanup.jobj registered as its finalizer. If my understanding is correct, I don't see strong reason for exposing cleanup.jobj. If active cleanup is demanded, a call to "gc" can be made? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14783: SPARK-16785 R dapply doesn't return array or raw ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14783#discussion_r76005367 --- Diff: R/pkg/inst/worker/worker.R --- @@ -36,7 +36,14 @@ compute <- function(mode, partition, serializer, deserializer, key, # available since R 3.2.4. So we set the global option here. oldOpt <- getOption("stringsAsFactors") options(stringsAsFactors = FALSE) - inputData <- do.call(rbind.data.frame, inputData) + + # Handle binary data types + if ("raw" %in% sapply(inputData[[1]], class)) { +inputData <- SparkR:::rbindRaws(inputData) --- End diff -- No it is not a preferred style in worker.R. It seems that they were some changes left slip under some previous code review. {code} suppressPackageStartupMessages(library(SparkR)) {code} should be moved to the front of work.R, and thus SparkR::: can be removed. A lot of "SparkR:::" is annoying. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14783: SPARK-16785 R dapply doesn't return array or raw ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14783#discussion_r76000970 --- Diff: R/pkg/inst/worker/worker.R --- @@ -36,7 +36,14 @@ compute <- function(mode, partition, serializer, deserializer, key, # available since R 3.2.4. So we set the global option here. oldOpt <- getOption("stringsAsFactors") options(stringsAsFactors = FALSE) - inputData <- do.call(rbind.data.frame, inputData) + + # Handle binary data types + if ("raw" %in% sapply(inputData[[1]], class)) { +inputData <- SparkR:::rbindRaws(inputData) --- End diff -- same as above. SparkR::: is not needed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14783: SPARK-16785 R dapply doesn't return array or raw ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14783#discussion_r76000958 --- Diff: R/pkg/inst/tests/testthat/test_utils.R --- @@ -183,4 +183,13 @@ test_that("overrideEnvs", { expect_equal(config[["config_only"]], "ok") }) +test_that("rbindRaws", { + r <- serialize(1, connection = NULL) + inputData <- list(list(1L, r), list(2L, r), list(3L, r)) + expected <- data.frame(V1 = 1:3) + expected$V2 <- list(r, r, r) + result <- SparkR:::rbindRaws(inputData) --- End diff -- SparkR::: is not needed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14783: SPARK-16785 R dapply doesn't return array or raw ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14783#discussion_r76000918 --- Diff: R/pkg/R/SQLContext.R --- @@ -183,6 +183,8 @@ getDefaultSqlSource <- function() { # TODO(davies): support sampling and infer type from NA createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { sparkSession <- getSparkSession() + + # Convert dataframes into a list of rows. Each row is a list --- End diff -- how about " Convert a dataframe"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14639: [SPARK-17054][SPARKR] SparkR can not run in yarn-cluster...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14639 Does this API get only the Spark SQL configurations or including SparkConf? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14639: [SPARK-17054][SPARKR] SparkR can not run in yarn-cluster...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14639 If in the future SparkConf is needed, instead of passing all spark conf to R via env variables, we can expose API for accessing SparkConf in the R backend, similar to that in Pyspark. https://github.com/apache/spark/blob/master/python/pyspark/conf.py --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14639: [SPARK-17054][SPARKR] SparkR can not run in yarn-cluster...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14639 I think there may be a simpler solution. Just as my comment in the JIRA, "EXISTING_SPARKR_BACKEND_PORT" env variable can be checked, instead of getting the whole spark conf from JVM into R. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14639: [SPARK-17054][SPARKR] SparkR can not run in yarn-cluster...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14639 @zjffdu, yes, no need to download spark in yarn-client provided that spark-submit is called to launch an R script. I just want to verify that your change works in this case. But note if yarn-client is used in Shell mode in a manual way (don not reply on spark-submit), it is possible that spark needs to be downloaded. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14639: [SPARK-17054][SPARKR] SparkR can not run in yarn-cluster...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14639 @zjffdu, does your change work on launching an R script in yarn-client mode? It seems that it won't --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14639: [SPARK-17054][SPARKR] SparkR can not run in yarn-cluster...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14639 This is not only about the correct cache dir under MAC OS, but also in yarn-cluster mode, there should not be downloading of Spark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11157: [SPARK-11714][Mesos] Make Spark on Mesos honor port rest...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/11157 has this change be documented to spark on mesos guide? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14575: [SPARK-16522][MESOS] Spark application throws exc...
Github user sun-rui closed the pull request at: https://github.com/apache/spark/pull/14575 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14575: [SPARK-16522][MESOS] Spark application throws exception ...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14575 @mgummelt, @srowen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14575: [SPARK-16522][MESOS] Spark application throws exc...
GitHub user sun-rui opened a pull request: https://github.com/apache/spark/pull/14575 [SPARK-16522][MESOS] Spark application throws exception on exit. This is backport of https://github.com/apache/spark/pull/14175 to branch 2.0 You can merge this pull request into a Git repository by running: $ git pull https://github.com/sun-rui/spark SPARK-16522-branch-2.0 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14575.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14575 commit f06ab151e91073c887b4fb61d440dffb5fbce391 Author: Sun Rui <sunrui2...@gmail.com> Date: 2016-08-09T08:39:45Z [SPARK-16522][MESOS] Spark application throws exception on exit. Spark applications running on Mesos throw exception upon exit. For details, refer to https://issues.apache.org/jira/browse/SPARK-16522. I am not sure if there is any better fix, so wait for review comments. Manual test. Observed that the exception is gone upon application exit. Author: Sun Rui <sunrui2...@gmail.com> Closes #14175 from sun-rui/SPARK-16522. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14175: [SPARK-16522][MESOS] Spark application throws exception ...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14175 ok, will submit another PR for 2.0 branch --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14175: [SPARK-16522][MESOS] Spark application throws exc...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14175#discussion_r73823225 --- Diff: core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala --- @@ -396,6 +425,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite mesosDriver = newDriver } + override def stopExecutors(): Unit = { --- End diff -- the flag stopCalled in the backend can't be accessed because it is private. Here by overriding stopExecutors() we can set a flag when stop is called. Another solution is to change the flag stopCalled in the backend to be public for test purpose. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14175: [SPARK-16522][MESOS] Spark application throws exc...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14175#discussion_r73823074 --- Diff: core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala --- @@ -341,6 +344,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(!dockerInfo.getForcePullImage) } + test("Do not call parent methods like removeExecutor() after backend is stopped") { +setBackend() + +// launches a task on a valid offer +val offers = List((backend.executorMemory(sc), 1)) +offerResources(offers) +verifyTaskLaunched(driver, "o1") + +// launches a thread simulating status update +val statusUpdateThread = new Thread { + override def run(): Unit = { +while (!stopCalled) { + Thread.sleep(100) +} + +val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED) +backend.statusUpdate(driver, status) + } +}.start + +backend.stop --- End diff -- ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14175: [SPARK-16522][MESOS] Spark application throws exc...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14175#discussion_r73823040 --- Diff: core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala --- @@ -341,6 +344,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(!dockerInfo.getForcePullImage) } + test("Do not call parent methods like removeExecutor() after backend is stopped") { +setBackend() + +// launches a task on a valid offer +val offers = List((backend.executorMemory(sc), 1)) +offerResources(offers) +verifyTaskLaunched(driver, "o1") + +// launches a thread simulating status update +val statusUpdateThread = new Thread { + override def run(): Unit = { +while (!stopCalled) { + Thread.sleep(100) +} + +val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED) +backend.statusUpdate(driver, status) + } +}.start + +backend.stop +// Any method of the backend involving sending messages to the driver endpoint should not +// be called after the backend is stopped. +verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]]) --- End diff -- I tried to do this. but found that driverEndpoint is mocked. Since the exception is thrown from within RemoveExecutor, I think this has the same purpose. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14175: [SPARK-16522][MESOS] Spark application throws exception ...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14175 rebased to master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14175: [SPARK-16522][MESOS] Spark application throws exception ...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14175 @mgummelt, regression test case added. Not sure it is the expected one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14175: [SPARK-16522][MESOS] Spark application throws exception ...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14175 @mgummelt, will do it soon --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14175: [SPARK-16522][MESOS] Spark application throws exc...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14175#discussion_r72368797 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -552,7 +552,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( taskId: String, reason: String): Unit = { stateLock.synchronized { - removeExecutor(taskId, SlaveLost(reason)) + // Do not call removeExecutor() after this scheduler backend was stopped because --- End diff -- what about submitting another JIRA issue on better handling of state management after stop() is called for CoarseGrainedSchedulerBackend? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14175: [SPARK-16522][MESOS] Spark application throws exception ...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14175 Sure, will add it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14309: [SPARK-11977][SQL] Support accessing a column contains "...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14309 @cloud-fan, could you help to review it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14175: [SPARK-16522][MESOS] Spark application throws exc...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14175#discussion_r72176972 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -552,7 +552,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( taskId: String, reason: String): Unit = { stateLock.synchronized { - removeExecutor(taskId, SlaveLost(reason)) + // Do not call removeExecutor() after this scheduler backend was stopped because --- End diff -- @mgummelt, what's your opinion? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14309: [SPARK-11977][SQL] Support accessing a column contains "...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14309 Does this solution co-operate with the access pattern of "table.column"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14309: [SPARK-11977][SQL] Support accessing a column con...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14309#discussion_r71982269 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -641,6 +641,10 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row(key, value, key + 1) }.toSeq) assert(df.schema.map(_.name) === Seq("key", "valueRenamed", "newCol")) + +// Renaming to a column that contains "." character +val df2 = testData.toDF().withColumnRenamed("value", "value.Renamed") +assert(df2.schema.map(_.name) === Seq("key", "value.Renamed")) --- End diff -- Please add more test cases that columns whose name has '.' can be accessed without backticks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14175: [SPARK-16522][MESOS] Spark application throws exc...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14175#discussion_r71982037 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -552,7 +552,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( taskId: String, reason: String): Unit = { stateLock.synchronized { - removeExecutor(taskId, SlaveLost(reason)) + // Do not call removeExecutor() after this scheduler backend was stopped because --- End diff -- Not only removeExecutor(), but also other methods, like reviveOffers(), killTask(), ..., should not be called after stopped. If you prefer adding comment in the parent class, then it seems it is more complete to add comment to all methods that may encounter such case. However, I don't think it is necessary to do so, as exceptions will be thrown in such case notifying the caller it is not valid to do such calls, just as why this issue was found. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14258: [Spark-16579][SparkR] add install_spark function
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14258#discussion_r71728669 --- Diff: R/pkg/inst/extdata/spark_download.csv --- @@ -0,0 +1,2 @@ +"url","default" +"http://apache.osuosl.org",TRUE --- End diff -- I don't see the strong reason for a CSV file listing available mirrors. Because we can support apache dynamic mirrors, and install_spark() has a parameter allowing user to pass a vector of mirror sites. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14258: [Spark-16579][SparkR] add install_spark function
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14258#discussion_r71726598 --- Diff: R/pkg/R/install.R --- @@ -0,0 +1,160 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Functions to install Spark in case the user directly downloads SparkR +# from CRAN. + +#' Download and Install Spark Core to Local Directory +#' +#' \code{install_spark} downloads and installs Spark to local directory if +#' it is not found. The Spark version we use is 2.0.0 (preview). Users can +#' specify a desired Hadoop version, the remote site, and the directory where +#' the package is installed locally. +#' +#' @param hadoop_version Version of Hadoop to install, 2.4, 2.6, +#'2.7 (default) and without +#' @param mirror_url the base URL of the repositories to use +#' @param local_dir local directory that Spark is installed to +#' @return \code{install_spark} returns the local directory +#' where Spark is found or installed +#' @rdname install_spark +#' @name install_spark +#' @export +#' @examples +#'\dontrun{ +#' install_spark() +#'} +#' @note install_spark since 2.1.0 +install_spark <- function(hadoop_version = NULL, mirror_url = NULL, + local_dir = NULL) { + version <- paste0("spark-", spark_version_default()) + hadoop_version <- match.arg(hadoop_version, supported_versions_hadoop()) + packageName <- ifelse(hadoop_version == "without", +paste0(version, "-bin-without-hadoop"), +paste0(version, "-bin-hadoop", hadoop_version)) + if (is.null(local_dir)) { +local_dir <- getOption("spark.install.dir", spark_cache_path()) + } else { +local_dir <- normalizePath(local_dir) + } + + packageLocalDir <- file.path(local_dir, packageName) + + if (dir.exists(packageLocalDir)) { +fmt <- "Spark %s for Hadoop %s has been installed." +msg <- sprintf(fmt, version, hadoop_version) +message(msg) +return(invisible(packageLocalDir)) + } + + packageLocalPath <- paste0(packageLocalDir, ".tgz") + tarExists <- file.exists(packageLocalPath) + + if (tarExists) { +message("Tar file found. Installing...") + } else { +dir.create(packageLocalDir, recursive = TRUE) +if (is.null(mirror_url)) { + message("Remote URL not provided. Use Apache default.") + mirror_url <- mirror_url_default() +} +# This is temporary, should be removed when released +version <- "spark-releases/spark-2.0.0-rc4-bin" --- End diff -- why this version is different from the above? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14258: [Spark-16579][SparkR] add install_spark function
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14258#discussion_r71726450 --- Diff: R/pkg/R/install.R --- @@ -0,0 +1,160 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Functions to install Spark in case the user directly downloads SparkR +# from CRAN. + +#' Download and Install Spark Core to Local Directory +#' +#' \code{install_spark} downloads and installs Spark to local directory if +#' it is not found. The Spark version we use is 2.0.0 (preview). Users can +#' specify a desired Hadoop version, the remote site, and the directory where +#' the package is installed locally. +#' +#' @param hadoop_version Version of Hadoop to install, 2.4, 2.6, +#'2.7 (default) and without +#' @param mirror_url the base URL of the repositories to use +#' @param local_dir local directory that Spark is installed to +#' @return \code{install_spark} returns the local directory +#' where Spark is found or installed +#' @rdname install_spark +#' @name install_spark +#' @export +#' @examples +#'\dontrun{ +#' install_spark() +#'} +#' @note install_spark since 2.1.0 +install_spark <- function(hadoop_version = NULL, mirror_url = NULL, + local_dir = NULL) { + version <- paste0("spark-", spark_version_default()) --- End diff -- no need to create a function for spark version. just use "packageVersion("SparkR")" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14264: [SPARK-11976][SPARKR] Support "." character in DataFrame...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14264 @rerngvit, I modifed the title of SPARK-11977 to a narrow scope. You can go for it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14264: [SPARK-11976][SPARKR] Support "." character in DataFrame...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14264 @rerngvit, sorry, I mean https://issues.apache.org/jira/browse/SPARK-11977. If your PR can enable accesses to columns with "." in their names without backticks, please first submit a PR for SPARK-11977, as the change is for the Spark Core, not SparkR specific. After that PR gets merged, you can then submit a PR for SPARK-11976 which contains SparkR only changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14264: [SPARK-11976][SPARKR] Support "." character in DataFrame...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14264 @rerngvit, could you share the background that this PR can fix the issue. I see that https://issues.apache.org/jira/browse/SPARK-11976 is still open. Any other PR in Spark 2.0 make this possible? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14243: [SPARK-10683][SPARK-16510][SPARKR] Move SparkR include j...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14243 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14175: [SPARK-16522][MESOS] Spark application throws exc...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14175#discussion_r71284361 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -552,7 +552,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( taskId: String, reason: String): Unit = { stateLock.synchronized { - removeExecutor(taskId, SlaveLost(reason)) + if (!stopCalled) { --- End diff -- comment added. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14243: [SPARK-10683][SPARK-16510][SPARKR] Move SparkR include j...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14243 better to add the SparkR installation check to the existing disable test? ignore("correctly builds R packages included in a jar with --packages") { ...} My expected solution would be setting an env var when the "sparkr" profile is specifed, and run SparkR related tests when the env var is detected. However, lets go with current way for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/12836 no, go ahead to submit one:) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14243: [SPARK-10683][SPARK-16510][SPARKR] Move SparkR include j...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/14243 Will this test be run always no matter if the "sparkr" profile is specified or not? In other words, does R need to installed for all spark tests to pass? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14175: [SPARK-16522][MESOS] Spark application throws exc...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14175#discussion_r71095619 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -552,7 +552,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( taskId: String, reason: String): Unit = { stateLock.synchronized { - removeExecutor(taskId, SlaveLost(reason)) + if (!stopCalled) { --- End diff -- If we add the guard in the parent class, namely CoarseGrainedSchedulerBackend, what's the appropriate behavior of the guard? Silently ignore all message requests after stop() is called and log warnings, or throw an exception? If latter, then the call to removeExecutor has to be wrapped with a try. Since the call to removeExecutor() is done in MesosCoarseGrainedSchedulerBackend, I think current fix is simpler and reasonable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14192: [SPARK-16509][SPARKR] Rename window.partitionBy a...
GitHub user sun-rui opened a pull request: https://github.com/apache/spark/pull/14192 [SPARK-16509][SPARKR] Rename window.partitionBy and window.orderBy to windowPartitionBy and windowOrderBy. ## What changes were proposed in this pull request? Rename window.partitionBy and window.orderBy to windowPartitionBy and windowOrderBy to pass CRAN package check. ## How was this patch tested? SparkR unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sun-rui/spark SPARK-16509 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14192.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14192 commit 38b256accd4ff1dabbdb5602eaaa600d9df9562a Author: Sun Rui <sunrui2...@gmail.com> Date: 2016-07-14T02:34:40Z [SPARK-16509][SPARKR] Rename window.partitionBy and window.orderBy to windowPartitionBy and windowOrderBy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14175: [SPARK-16522][MESOS] Spark application throws exc...
GitHub user sun-rui opened a pull request: https://github.com/apache/spark/pull/14175 [SPARK-16522][MESOS] Spark application throws exception on exit. ## What changes were proposed in this pull request? Spark applications running on Mesos throw exception upon exit. For details, refer to https://issues.apache.org/jira/browse/SPARK-16522. I am not sure if there is any better fix, so wait for review comments. ## How was this patch tested? Manual test. Observed that the exception is gone upon application exit. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sun-rui/spark SPARK-16522 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14175.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14175 commit 6fe96e5879fd97aa630839e670e3d8b17de785be Author: Sun Rui <sunrui2...@gmail.com> Date: 2016-07-13T07:43:38Z [SPARK-16522][MESOS] Spark application throws exception on exit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14046: [SPARK-16366][SPARKR] Fix time comparison failure...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/14046#discussion_r69501135 --- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R --- @@ -1258,10 +1258,12 @@ test_that("date functions on a DataFrame", { df2 <- createDataFrame(l2) expect_equal(collect(select(df2, minute(df2$b)))[, 1], c(34, 24)) expect_equal(collect(select(df2, second(df2$b)))[, 1], c(0, 34)) - expect_equal(collect(select(df2, from_utc_timestamp(df2$b, "JST")))[, 1], - c(as.POSIXlt("2012-12-13 21:34:00 UTC"), as.POSIXlt("2014-12-15 10:24:34 UTC"))) - expect_equal(collect(select(df2, to_utc_timestamp(df2$b, "JST")))[, 1], - c(as.POSIXlt("2012-12-13 03:34:00 UTC"), as.POSIXlt("2014-12-14 16:24:34 UTC"))) + t <- c(as.POSIXlt("2012-12-13 21:34:00 UTC"), as.POSIXlt("2014-12-15 10:24:34 UTC")) + attr(t, "tzone") <- NULL --- End diff -- I do not have a deep understanding of time zone in R. let me spend some time to see if I can have a better fix. May need a look at date/time handling in serde. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14046: [SPARK-16366][SPARKR] Fix time comparison failure...
GitHub user sun-rui opened a pull request: https://github.com/apache/spark/pull/14046 [SPARK-16366][SPARKR] Fix time comparison failures in SparkR unit tests. ## What changes were proposed in this pull request? Fix time comparison failures in SparkR unit tests. For details, refer to https://issues.apache.org/jira/browse/SPARK-16366. ## How was this patch tested? SparkR unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sun-rui/spark SPARK-16366 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14046.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14046 commit 448450ea795a68e5933d62f8c5521f4d520e02bd Author: Sun Rui <sunrui2...@gmail.com> Date: 2016-07-04T15:19:41Z [SPARK-16366][SPARKR] Fix time comparison failures in SparkR unit tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13760: [SPARK-16012][SparkR] Implement gapplyCollect which will...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/13760 no --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13975: [SPARK-16299][SPARKR] Capture errors from R worke...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13975#discussion_r69089989 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -44,7 +44,7 @@ while (TRUE) { if (inherits(p, "masterProcess")) { close(inputCon) Sys.setenv(SPARKR_WORKER_PORT = port) - source(script) + try(source(script)) --- End diff -- error still to stderr with try() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13975: [SPARK-16299][SPARKR] Capture errors from R workers in d...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/13975 @shivaram, @davies --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13975: [SPARK-16299][SPARKR] Capture errors from R worke...
GitHub user sun-rui opened a pull request: https://github.com/apache/spark/pull/13975 [SPARK-16299][SPARKR] Capture errors from R workers in daemon.R to avoid deletion of R session temporary directory. ## What changes were proposed in this pull request? Capture errors from R workers in daemon.R to avoid deletion of R session temporary directory. See detailed description at https://issues.apache.org/jira/browse/SPARK-16299 ## How was this patch tested? SparkR unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sun-rui/spark SPARK-16299 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13975.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13975 commit fc4ffc12e56b98ae2a56d3a4631797030eacc0ea Author: Sun Rui <sunrui2...@gmail.com> Date: 2016-06-29T15:48:07Z [SPARK-16299][SPARKR] Capture errors from R workers in daemon.R to avoid deletion of R session temporary directory. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13760: [SPARK-16012][SparkR] gapplyCollect - applies a R functi...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/13760 LGTM except on minor comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] gapplyCollect - applies a R...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r67983701 --- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R --- @@ -2236,12 +2236,15 @@ test_that("gapply() on a DataFrame", { actual <- collect(df1) expect_identical(actual, expected) + df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x }) --- End diff -- maybe better change list("a") to "a" to test if a scalar column parameter can work --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13660: [SPARK-15672][R][DOC] R programming guide update
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13660#discussion_r67881287 --- Diff: docs/sparkr.md --- @@ -262,6 +262,83 @@ head(df) {% endhighlight %} +### Applying User-defined Function +In SparkR, we support several kinds for User-defined Functions: + + Run a given function on a large dataset using `dapply` or `dapplyCollect` + +# dapply +Apply a function to each partition of `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` +and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function +should be a `data.frame`. Schema specifies the row format of the resulting `SparkDataFrame`. It must match the R function's output. + +{% highlight r %} + +# Convert waiting time from hours to seconds. +# Note that we can apply UDF to DataFrame. +schema <- structType(structField("eruptions", "double"), structField("waiting", "double"), + structField("waiting_secs", "double")) +df1 <- dapply(df, function(x) {x <- cbind(x, x$waiting * 60)}, schema) +head(collect(df1)) +## eruptions waiting waiting_secs +##1 3.600 79 4740 +##2 1.800 54 3240 +##3 3.333 74 4440 +##4 2.283 62 3720 +##5 4.533 85 5100 +##6 2.883 55 3300 +{% endhighlight %} + + +# dapplyCollect +Like `dapply`, apply a function to each partition of `SparkDataFrame` and collect the result back. The output of function +should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` only can be used if the +output of UDF run on all the partitions can fit in driver memory. + +{% highlight r %} + +# Convert waiting time from hours to seconds. +# Note that we can apply UDF to DataFrame and return a R's data.frame +ldf <- dapplyCollect( + df, + function(x) { + x <- cbind(x, "waiting_secs"=x$waiting * 60) + }) +head(ldf, 3) +## eruptions waiting waiting_secs +##1 3.600 79 4740 +##2 1.800 54 3240 +##3 3.333 74 4440 + +{% endhighlight %} + + + Run many functions in parallel using `spark.lapply` + +# lapply --- End diff -- One thought about spark.lapply() is that documenting here means our commitment to it. This is a case demonstrating the need to support Dataset in SparkR. Maybe next step we can consider replace RDD with Dataset in SparkR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13660: [SPARK-15672][R][DOC] R programming guide update
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13660#discussion_r67880977 --- Diff: docs/sparkr.md --- @@ -262,6 +262,83 @@ head(df) {% endhighlight %} +### Applying User-defined Function +In SparkR, we support several kinds for User-defined Functions: + + Run a given function on a large dataset using `dapply` or `dapplyCollect` + +# dapply +Apply a function to each partition of `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` +and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function +should be a `data.frame`. Schema specifies the row format of the resulting `SparkDataFrame`. It must match the R function's output. + +{% highlight r %} + +# Convert waiting time from hours to seconds. +# Note that we can apply UDF to DataFrame. +schema <- structType(structField("eruptions", "double"), structField("waiting", "double"), + structField("waiting_secs", "double")) +df1 <- dapply(df, function(x) {x <- cbind(x, x$waiting * 60)}, schema) +head(collect(df1)) +## eruptions waiting waiting_secs +##1 3.600 79 4740 +##2 1.800 54 3240 +##3 3.333 74 4440 +##4 2.283 62 3720 +##5 4.533 85 5100 +##6 2.883 55 3300 +{% endhighlight %} + + +# dapplyCollect +Like `dapply`, apply a function to each partition of `SparkDataFrame` and collect the result back. The output of function +should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` only can be used if the +output of UDF run on all the partitions can fit in driver memory. + +{% highlight r %} + +# Convert waiting time from hours to seconds. +# Note that we can apply UDF to DataFrame and return a R's data.frame +ldf <- dapplyCollect( + df, + function(x) { + x <- cbind(x, "waiting_secs"=x$waiting * 60) + }) +head(ldf, 3) +## eruptions waiting waiting_secs +##1 3.600 79 4740 +##2 1.800 54 3240 +##3 3.333 74 4440 + +{% endhighlight %} + + + Run many functions in parallel using `spark.lapply` + +# lapply --- End diff -- spark.lapply --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13660: [SPARK-15672][R][DOC] R programming guide update
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13660#discussion_r67880749 --- Diff: docs/sparkr.md --- @@ -262,6 +262,83 @@ head(df) {% endhighlight %} +### Applying User-defined Function +In SparkR, we support several kinds for User-defined Functions: + + Run a given function on a large dataset using `dapply` or `dapplyCollect` + +# dapply +Apply a function to each partition of `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` +and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function +should be a `data.frame`. Schema specifies the row format of the resulting `SparkDataFrame`. It must match the R function's output. + +{% highlight r %} + +# Convert waiting time from hours to seconds. +# Note that we can apply UDF to DataFrame. +schema <- structType(structField("eruptions", "double"), structField("waiting", "double"), + structField("waiting_secs", "double")) +df1 <- dapply(df, function(x) {x <- cbind(x, x$waiting * 60)}, schema) +head(collect(df1)) +## eruptions waiting waiting_secs +##1 3.600 79 4740 +##2 1.800 54 3240 +##3 3.333 74 4440 +##4 2.283 62 3720 +##5 4.533 85 5100 +##6 2.883 55 3300 +{% endhighlight %} + + +# dapplyCollect +Like `dapply`, apply a function to each partition of `SparkDataFrame` and collect the result back. The output of function +should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` only can be used if the +output of UDF run on all the partitions can fit in driver memory. + +{% highlight r %} + +# Convert waiting time from hours to seconds. +# Note that we can apply UDF to DataFrame and return a R's data.frame +ldf <- dapplyCollect( + df, + function(x) { + x <- cbind(x, "waiting_secs"=x$waiting * 60) --- End diff -- style nit: "waiting_secs" = x$waiting * 60 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13660: [SPARK-15672][R][DOC] R programming guide update
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13660#discussion_r67880530 --- Diff: docs/sparkr.md --- @@ -262,6 +262,83 @@ head(df) {% endhighlight %} +### Applying User-defined Function +In SparkR, we support several kinds for User-defined Functions: + + Run a given function on a large dataset using `dapply` or `dapplyCollect` + +# dapply +Apply a function to each partition of `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` +and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function +should be a `data.frame`. Schema specifies the row format of the resulting `SparkDataFrame`. It must match the R function's output. + +{% highlight r %} + +# Convert waiting time from hours to seconds. +# Note that we can apply UDF to DataFrame. +schema <- structType(structField("eruptions", "double"), structField("waiting", "double"), + structField("waiting_secs", "double")) +df1 <- dapply(df, function(x) {x <- cbind(x, x$waiting * 60)}, schema) +head(collect(df1)) +## eruptions waiting waiting_secs +##1 3.600 79 4740 +##2 1.800 54 3240 +##3 3.333 74 4440 +##4 2.283 62 3720 +##5 4.533 85 5100 +##6 2.883 55 3300 +{% endhighlight %} + + +# dapplyCollect +Like `dapply`, apply a function to each partition of `SparkDataFrame` and collect the result back. The output of function --- End diff -- apply a function to each partition of a `SparkDataFrame` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13660: [SPARK-15672][R][DOC] R programming guide update
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13660#discussion_r67880209 --- Diff: docs/sparkr.md --- @@ -262,6 +262,83 @@ head(df) {% endhighlight %} +### Applying User-defined Function +In SparkR, we support several kinds for User-defined Functions: + + Run a given function on a large dataset using `dapply` or `dapplyCollect` + +# dapply +Apply a function to each partition of `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` --- End diff -- Apply a function to each partition of a `SparkDataFrame`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13660: [SPARK-15672][R][DOC] R programming guide update
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13660#discussion_r67880117 --- Diff: docs/sparkr.md --- @@ -262,6 +262,83 @@ head(df) {% endhighlight %} +### Applying User-defined Function +In SparkR, we support several kinds for User-defined Functions: --- End diff -- several kinds of? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13660: [SPARK-15672][R][DOC] R programming guide update
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/13660 Can you add documentation for gapply() and gapplyCollect() together here? or @NarineK will do in another PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13752: [SPARK-16028][SPARKR] spark.lapply can work with active ...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/13752 I think spark.lapply() is a case that demonstrates the need for supporting Dataset in SparkR. Removing the explicit sc parameter is quite helpful to moving to Dataset internally in the future if Dataset is to be supported --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] GapplyCollect - applies a R...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r67799970 --- Diff: R/pkg/R/DataFrame.R --- @@ -1347,6 +1347,65 @@ setMethod("gapply", gapply(grouped, func, schema) }) +#' gapplyCollect +#' +#' Groups the SparkDataFrame using the specified columns, applies the R function to each +#' group and collects the result back to R as data.frame. +#' +#' @param x A SparkDataFrame +#' @rdname gapplyCollect +#' @name gapplyCollect +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' result <- gapplyCollect( +#' df, +#' list("a", "c"), +#' function(key, x) { +#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) +#' colnames(y) <- c("key_a", "key_c", "mean_b") +#' y +#' }) +#' +#' Result +#' -- +#' key_a key_c mean_b +#' 3 3 3.0 +#' 1 1 1.5 +#' +#' Fits linear models on iris dataset by grouping on the 'Species' column and +#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length' +#' and 'Petal_Width' as training features. +#' +#' df <- createDataFrame (iris) +#' result <- gapplyCollect( +#' df, +#' list(df$"Species"), +#' function(key, x) { +#' m <- suppressWarnings(lm(Sepal_Length ~ +#' Sepal_Width + Petal_Length + Petal_Width, x)) --- End diff -- ident here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] GapplyCollect - applies a R...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r67799929 --- Diff: R/pkg/R/DataFrame.R --- @@ -1347,6 +1347,65 @@ setMethod("gapply", gapply(grouped, func, schema) }) +#' gapplyCollect +#' +#' Groups the SparkDataFrame using the specified columns, applies the R function to each +#' group and collects the result back to R as data.frame. +#' +#' @param x A SparkDataFrame +#' @rdname gapplyCollect +#' @name gapplyCollect +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' result <- gapplyCollect( +#' df, +#' list("a", "c"), +#' function(key, x) { +#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) +#' colnames(y) <- c("key_a", "key_c", "mean_b") +#' y +#' }) +#' +#' Result +#' -- +#' key_a key_c mean_b +#' 3 3 3.0 +#' 1 1 1.5 +#' +#' Fits linear models on iris dataset by grouping on the 'Species' column and +#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length' +#' and 'Petal_Width' as training features. +#' +#' df <- createDataFrame (iris) +#' result <- gapplyCollect( +#' df, +#' list(df$"Species"), --- End diff -- no need for a scalar to be a list. just df$"Species" is OK. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] GapplyCollect - applies a R...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r67799741 --- Diff: R/pkg/R/DataFrame.R --- @@ -1347,6 +1347,65 @@ setMethod("gapply", gapply(grouped, func, schema) }) +#' gapplyCollect +#' +#' Groups the SparkDataFrame using the specified columns, applies the R function to each +#' group and collects the result back to R as data.frame. +#' +#' @param x A SparkDataFrame +#' @rdname gapplyCollect +#' @name gapplyCollect +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' result <- gapplyCollect( +#' df, +#' list("a", "c"), --- End diff -- use c("a", "c") is more natural? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13790: remove duplicated docs in dapply
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/13790 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13763: [SPARK-16051][R] Add `read.orc/write.orc` to Spar...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13763#discussion_r67610942 --- Diff: R/pkg/R/SQLContext.R --- @@ -330,6 +330,30 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) { } } +#' Create a SparkDataFrame from a ORC file. +#' +#' Loads a ORC file, returning the result as a SparkDataFrame. +#' +#' @param path Path of file to read. +#' @return SparkDataFrame +#' @rdname read.orc +#' @export +#' @name read.orc +#' @method read.orc default +#' @note read.orc since 2.0.0 +read.orc.default <- function(path) { --- End diff -- Since read.orc is a new API method, this is not needed for backward compatibility? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13763: [SPARK-16051][R] Add `read.orc/write.orc` to Spar...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13763#discussion_r67610931 --- Diff: R/pkg/R/SQLContext.R --- @@ -330,6 +330,30 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) { } } +#' Create a SparkDataFrame from a ORC file. --- End diff -- an ORC --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13763: [SPARK-16051][R] Add `read.orc/write.orc` to Spar...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13763#discussion_r67610926 --- Diff: R/pkg/R/DataFrame.R --- @@ -701,6 +701,33 @@ setMethod("write.json", invisible(callJMethod(write, "json", path)) }) +#' Save the contents of SparkDataFrame as a ORC file, preserving the schema. --- End diff -- an ORC --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13684: [SPARK-15908][R] Add varargs-type dropDuplicates() funct...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/13684 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13635: [SPARK-15159][SPARKR] SparkR SparkSession API
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13635#discussion_r67385138 --- Diff: R/pkg/R/sparkR.R --- @@ -270,27 +291,97 @@ sparkRSQL.init <- function(jsc = NULL) { #'} sparkRHive.init <- function(jsc = NULL) { - if (exists(".sparkRHivesc", envir = .sparkREnv)) { -return(get(".sparkRHivesc", envir = .sparkREnv)) + .Deprecated("sparkR.session.getOrCreate") + + if (exists(".sparkRsession", envir = .sparkREnv)) { +return(get(".sparkRsession", envir = .sparkREnv)) } - # If jsc is NULL, create a Spark Context - sc <- if (is.null(jsc)) { -sparkR.init() - } else { -jsc + # Default to without Hive support for backward compatibility. + sparkR.session.getOrCreate(enableHiveSupport = TRUE) +} + +#' Get the existing SparkSession or initialize a new SparkSession. +#' +#' Additional Spark properties can be set (...), and these named parameters take priority over +#' over values in master, appName, named lists of sparkConfig. +#' +#' @param master The Spark master URL +#' @param appName Application name to register with cluster manager +#' @param sparkHome Spark Home directory +#' @param sparkConfig Named list of Spark configuration to set on worker nodes +#' @param sparkExecutorConfig Named list of Spark configuration to be used when launching executors +#' @param sparkJars Character vector of jar files to pass to the worker nodes +#' @param sparkPackages Character vector of packages from spark-packages.org +#' @param enableHiveSupport Enable support for Hive; once set, this cannot be turned off on an +#'existing session +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session.getOrCreate() +#' df <- read.json(path) +#' +#' sparkR.session.getOrCreate("local[2]", "SparkR", "/home/spark") +#' sparkR.session.getOrCreate("yarn-client", "SparkR", "/home/spark", +#'list(spark.executor.memory="4g"), +#'list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"), +#'c("one.jar", "two.jar", "three.jar"), +#'c("com.databricks:spark-avro_2.10:2.0.1")) +#' sparkR.session.getOrCreate(spark.master = "yarn-client", +#'spark.executor.memory = "4g") +#'} +#' @note since 2.0.0 + +sparkR.session.getOrCreate <- function( + master = "", + appName = "SparkR", + sparkHome = Sys.getenv("SPARK_HOME"), + sparkConfig = list(), + sparkExecutorConfig = list(), + sparkJars = "", + sparkPackages = "", + enableHiveSupport = TRUE, --- End diff -- enableHiveSupport is better to be FALSE --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13635: [SPARK-15159][SPARKR] SparkR SparkSession API
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13635#discussion_r67384711 --- Diff: R/pkg/NAMESPACE --- @@ -6,10 +6,15 @@ importFrom(methods, setGeneric, setMethod, setOldClass) #useDynLib(SparkR, stringHashCode) # S3 methods exported +export("sparkR.session.getOrCreate") --- End diff -- while SparkSession is the main entry, SparkContext does remain. My thoughts are: 1. Keep sparkR.init() as is. It still returns the SparkContext. 2. Add new API like SparkRSession.init(), and sparkRSession.stop(). SparkRSession.init() has two forms: A. it can accept a SparkContext as a parameter, and no other Spark Configurations. B. Just like the current sparkR.session.getOrCreate(), it internally creates SparkContext. 3. Keep sparkRSQL.init() and sparkRHive.init() for backward compatibility, while they are updated to call SparkRSession.init(). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13684: [SPARK-15908][R] Add varargs-type dropDuplicates(...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13684#discussion_r67287926 --- Diff: R/pkg/R/DataFrame.R --- @@ -1856,10 +1856,11 @@ setMethod("where", #' the subset of columns. #' #' @param x A SparkDataFrame. -#' @param colnames A character vector of column names. +#' @param ... A character vector of column names or string column names. +#' If the first argument contains a character vector then the following column names are ignored. --- End diff -- align this line to "A character ..." of the above line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13635: [SPARK-15159][SPARKR] SparkR SparkSession API
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/13635 @shivaram, I probably take a look at this tonight. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/12836 @shivaram, LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r67161527 --- Diff: R/pkg/inst/worker/worker.R --- @@ -79,75 +127,72 @@ if (numBroadcastVars > 0) { # Timing broadcast broadcastElap <- elapsedSecs() +# Initial input timing +inputElap <- broadcastElap # If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the int # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) -isDataFrame <- as.logical(SparkR:::readInt(inputCon)) +# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode +mode <- SparkR:::readInt(inputCon) -# If isDataFrame, then read column names -if (isDataFrame) { +if (mode > 0) { colNames <- SparkR:::readObject(inputCon) } isEmpty <- SparkR:::readInt(inputCon) +computeInputElapsDiff <- 0 +outputComputeElapsDiff <- 0 if (isEmpty != 0) { - if (numPartitions == -1) { if (deserializer == "byte") { # Now read as many characters as described in funcLen data <- SparkR:::readDeserialize(inputCon) } else if (deserializer == "string") { data <- as.list(readLines(inputCon)) -} else if (deserializer == "row") { +} else if (deserializer == "row" && mode == 2) { + dataWithKeys <- SparkR:::readMultipleObjectsWithKeys(inputCon) + keys <- dataWithKeys$keys + data <- dataWithKeys$data +} else if (deserializer == "row"){ data <- SparkR:::readMultipleObjects(inputCon) } -# Timing reading input data for execution -inputElap <- elapsedSecs() -if (isDataFrame) { - if (deserializer == "row") { -# Transform the list of rows into a data.frame -# Note that the optional argument stringsAsFactors for rbind is -# available since R 3.2.4. So we set the global option here. -oldOpt <- getOption("stringsAsFactors") -options(stringsAsFactors = FALSE) -data <- do.call(rbind.data.frame, data) -options(stringsAsFactors = oldOpt) - -names(data) <- colNames - } else { -# Check to see if data is a valid data.frame -stopifnot(deserializer == "byte") -stopifnot(class(data) == "data.frame") - } - output <- computeFunc(data) - if (serializer == "row") { -# Transform the result data.frame back to a list of rows -output <- split(output, seq(nrow(output))) - } else { -# Serialize the ouput to a byte array -stopifnot(serializer == "byte") +inputElap <- elapsedSecs() +if (mode > 0) { + if (mode == 1) { +# Timing reading input data for execution +output <- compute(mode, partition, serializer, deserializer, NULL, +colNames, computeFunc, outputCon, data) + } else { +# gapply mode +for (i in 1:length(data)) { + # Timing reading input data for execution + inputElap <- elapsedSecs() + output <- compute(mode, partition, serializer, deserializer, keys[[i]], + colNames, computeFunc, outputCon, data[[i]]) + computeElap <- elapsedSecs() + outputResult(serializer, output, outputCon) + outputElap <- elapsedSecs() + computeInputElapsDiff <- computeInputElapsDiff + (computeElap - inputElap) + outputComputeElapsDiff <- outputComputeElapsDiff + (outputElap - computeElap) +} } } else { - output <- computeFunc(partition, data) + # Timing reading input data for execution --- End diff -- this one? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/12836 @NarineK, there is one comment left un-addressed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r66733080 --- Diff: core/src/main/scala/org/apache/spark/api/r/RRunner.scala --- @@ -40,7 +40,8 @@ private[spark] class RRunner[U]( broadcastVars: Array[Broadcast[Object]], numPartitions: Int = -1, isDataFrame: Boolean = false, -colNames: Array[String] = null) +colNames: Array[String] = null, +mode: Int = 0) --- End diff -- it is better to define enumerations for mode instead of hard-coding. for example, private[sql] object RRunnerModes = { val RDD = 0 val DATAFRAME_DAPPLY = 1 val DATAFRAME_GAPPLY = 2 } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/12836 @shivaram, I think we are reaching the final version:). It would be better that you can have a detailed review on the examples and test cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r66721674 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala --- @@ -325,6 +330,71 @@ case class MapGroupsExec( } /** + * Groups the input rows together and calls the R function with each group and an iterator + * containing all elements in the group. + * The result of this function is flattened before being output. + */ +case class FlatMapGroupsInRExec( +func: Array[Byte], +packageNames: Array[Byte], +broadcastVars: Array[Broadcast[Object]], +inputSchema: StructType, +outputSchema: StructType, +keyDeserializer: Expression, +valueDeserializer: Expression, +groupingAttributes: Seq[Attribute], +dataAttributes: Seq[Attribute], +outputObjAttr: Attribute, +child: SparkPlan) extends UnaryExecNode with ObjectProducerExec { + + override def output: Seq[Attribute] = outputObjAttr :: Nil + override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) + + override protected def doExecute(): RDD[InternalRow] = { +val isSerializedRData = + if (outputSchema == SERIALIZED_R_DATA_SCHEMA) true else false +val serializerForR = if (!isSerializedRData) { + SerializationFormats.ROW +} else { + SerializationFormats.BYTE +} + +child.execute().mapPartitionsInternal { iter => + val grouped = GroupedIterator(iter, groupingAttributes, child.output) + val getKey = ObjectOperator.deserializeRowToObject(keyDeserializer, groupingAttributes) + val getValue = ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes) + val outputObject = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType) + val runner = new RRunner[Array[Byte]]( +func, SerializationFormats.ROW, serializerForR, packageNames, broadcastVars, +isDataFrame = true, colNames = inputSchema.fieldNames, mode = 2) + + val groupedRBytes = grouped.flatMap { case (key, rowIter) => --- End diff -- why flatMap here? map() should be OK. is there something I miss? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r66721643 --- Diff: R/pkg/inst/worker/worker.R --- @@ -79,75 +127,72 @@ if (numBroadcastVars > 0) { # Timing broadcast broadcastElap <- elapsedSecs() +# Initial input timing +inputElap <- broadcastElap # If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the int # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) -isDataFrame <- as.logical(SparkR:::readInt(inputCon)) +# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode +mode <- SparkR:::readInt(inputCon) -# If isDataFrame, then read column names -if (isDataFrame) { +if (mode > 0) { colNames <- SparkR:::readObject(inputCon) } isEmpty <- SparkR:::readInt(inputCon) +computeInputElapsDiff <- 0 +outputComputeElapsDiff <- 0 if (isEmpty != 0) { - if (numPartitions == -1) { if (deserializer == "byte") { # Now read as many characters as described in funcLen data <- SparkR:::readDeserialize(inputCon) } else if (deserializer == "string") { data <- as.list(readLines(inputCon)) -} else if (deserializer == "row") { +} else if (deserializer == "row" && mode == 2) { + dataWithKeys <- SparkR:::readMultipleObjectsWithKeys(inputCon) + keys <- dataWithKeys$keys + data <- dataWithKeys$data +} else if (deserializer == "row"){ data <- SparkR:::readMultipleObjects(inputCon) } -# Timing reading input data for execution -inputElap <- elapsedSecs() -if (isDataFrame) { - if (deserializer == "row") { -# Transform the list of rows into a data.frame -# Note that the optional argument stringsAsFactors for rbind is -# available since R 3.2.4. So we set the global option here. -oldOpt <- getOption("stringsAsFactors") -options(stringsAsFactors = FALSE) -data <- do.call(rbind.data.frame, data) -options(stringsAsFactors = oldOpt) - -names(data) <- colNames - } else { -# Check to see if data is a valid data.frame -stopifnot(deserializer == "byte") -stopifnot(class(data) == "data.frame") - } - output <- computeFunc(data) - if (serializer == "row") { -# Transform the result data.frame back to a list of rows -output <- split(output, seq(nrow(output))) - } else { -# Serialize the ouput to a byte array -stopifnot(serializer == "byte") +inputElap <- elapsedSecs() +if (mode > 0) { + if (mode == 1) { +# Timing reading input data for execution +output <- compute(mode, partition, serializer, deserializer, NULL, +colNames, computeFunc, outputCon, data) + } else { +# gapply mode +for (i in 1:length(data)) { + # Timing reading input data for execution + inputElap <- elapsedSecs() + output <- compute(mode, partition, serializer, deserializer, keys[[i]], + colNames, computeFunc, outputCon, data[[i]]) + computeElap <- elapsedSecs() + outputResult(serializer, output, outputCon) + outputElap <- elapsedSecs() + computeInputElapsDiff <- computeInputElapsDiff + (computeElap - inputElap) + outputComputeElapsDiff <- outputComputeElapsDiff + (outputElap - computeElap) +} } } else { - output <- computeFunc(partition, data) + # Timing reading input data for execution --- End diff -- this line of comment should be located before line 163? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r66721611 --- Diff: R/pkg/inst/worker/worker.R --- @@ -79,75 +127,72 @@ if (numBroadcastVars > 0) { # Timing broadcast broadcastElap <- elapsedSecs() +# Initial input timing +inputElap <- broadcastElap # If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the int # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) -isDataFrame <- as.logical(SparkR:::readInt(inputCon)) +# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode +mode <- SparkR:::readInt(inputCon) -# If isDataFrame, then read column names -if (isDataFrame) { +if (mode > 0) { colNames <- SparkR:::readObject(inputCon) } isEmpty <- SparkR:::readInt(inputCon) +computeInputElapsDiff <- 0 +outputComputeElapsDiff <- 0 if (isEmpty != 0) { - if (numPartitions == -1) { if (deserializer == "byte") { # Now read as many characters as described in funcLen data <- SparkR:::readDeserialize(inputCon) } else if (deserializer == "string") { data <- as.list(readLines(inputCon)) -} else if (deserializer == "row") { +} else if (deserializer == "row" && mode == 2) { + dataWithKeys <- SparkR:::readMultipleObjectsWithKeys(inputCon) + keys <- dataWithKeys$keys + data <- dataWithKeys$data +} else if (deserializer == "row"){ --- End diff -- style: (deserializer == "row") { --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r66721551 --- Diff: R/pkg/inst/worker/worker.R --- @@ -27,6 +27,54 @@ elapsedSecs <- function() { proc.time()[3] } +compute <- function(mode, partition, serializer, deserializer, key, --- End diff -- function(computeFunc, mode, partition, key, inputData, serializer, deserializer, colNames) remove unused argument and re-order arguments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r66721485 --- Diff: R/pkg/R/group.R --- @@ -142,3 +142,58 @@ createMethods <- function() { } createMethods() + +#' gapply +#' +#' Applies a R function to each group in the input GroupedData +#' +#' @param x a GroupedData +#' @param func A function to be applied to each group partition specified by GroupedData. +#' The function `func` takes as argument a key - grouping columns and +#' a data frame - a local R data.frame. +#' The output of `func` is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' It must match the output of func. +#' @return a SparkDataFrame +#' @rdname gapply +#' @name gapply +#' @examples +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), +#' c("a", "b", "c", "d")) +#' +#' schema <- structType(structField("a", "integer"), structField("c", "string"), +#' structField("avg", "double")) +#' df1 <- gapply( +#' df, +#' list("a", "c"), +#' function(key, x) { +#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) +#' }, +#' schema) +#' collect(df1) +#' +#' Result +#' -- +#' a c avg +#' 3 3 3.0 +#' 1 1 1.5 +#' } +setMethod("gapply", + signature(x = "GroupedData"), + function(x, func, schema) { +packageNamesArr <- serialize(.sparkREnv[[".packages"]], + connection = NULL) +broadcastArr <- lapply(ls(.broadcastNames), + function(name) { get(name, .broadcastNames) }) +sdf <- callJMethod(x@sgd, "flatMapGroupsInR", + serialize(cleanClosure(func), connection = NULL), + packageNamesArr, + broadcastArr, + if (is.null(schema)) { schema } else { schema$jobj }) --- End diff -- same as dapply, we don't support NULL schema for now. fails when schema is NULL --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r66721436 --- Diff: R/pkg/R/group.R --- @@ -142,3 +142,58 @@ createMethods <- function() { } createMethods() + +#' gapply +#' +#' Applies a R function to each group in the input GroupedData +#' +#' @param x a GroupedData +#' @param func A function to be applied to each group partition specified by GroupedData. +#' The function `func` takes as argument a key - grouping columns and +#' a data frame - a local R data.frame. +#' The output of `func` is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' It must match the output of func. +#' @return a SparkDataFrame +#' @rdname gapply +#' @name gapply +#' @examples +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), +#' c("a", "b", "c", "d")) +#' +#' schema <- structType(structField("a", "integer"), structField("c", "string"), +#' structField("avg", "double")) +#' df1 <- gapply( +#' df, +#' list("a", "c"), +#' function(key, x) { +#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) +#' }, +#' schema) +#' collect(df1) +#' +#' Result +#' -- +#' a c avg +#' 3 3 3.0 +#' 1 1 1.5 +#' } +setMethod("gapply", + signature(x = "GroupedData"), + function(x, func, schema) { +packageNamesArr <- serialize(.sparkREnv[[".packages"]], + connection = NULL) +broadcastArr <- lapply(ls(.broadcastNames), + function(name) { get(name, .broadcastNames) }) +sdf <- callJMethod(x@sgd, "flatMapGroupsInR", --- End diff -- As discussed above, call a helper function in SQLUtils instead of calling the function of GroupedData --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/12836 yes, let's do it in a separate PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r66721354 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -381,6 +385,50 @@ class RelationalGroupedDataset protected[sql]( def pivot(pivotColumn: String, values: java.util.List[Any]): RelationalGroupedDataset = { pivot(pivotColumn, values.asScala) } + + /** + * Applies the given serialized R function `func` to each group of data. For each unique group, + * the function will be passed the group key and an iterator that contains all of the elements in + * the group. The function can return an iterator containing elements of an arbitrary type which + * will be returned as a new [[DataFrame]]. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [[Dataset]]. If an application intends to perform an aggregation over each + * key, it is best to use the reduce function or an + * [[org.apache.spark.sql.expressions#Aggregator Aggregator]]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling `toList`) unless they are sure that this is possible given the memory + * constraints of their cluster. + * + * @since 2.0.0 + */ + private[sql] def flatMapGroupsInR( + f: Array[Byte], + packageNames: Array[Byte], + broadcastVars: Array[Object], + outputSchema: StructType): DataFrame = { + val broadcastVarObj = broadcastVars.map(_.asInstanceOf[Broadcast[Object]]) + val groupingNamedExpressions = groupingExprs.map(alias) + val groupingCols = groupingNamedExpressions.map(Column(_)) + val groupingDataFrame = df.select(groupingCols : _*) + val groupingAttributes = groupingNamedExpressions.map(_.toAttribute) + val realOutputSchema = if (outputSchema == null) SERIALIZED_R_DATA_SCHEMA else outputSchema --- End diff -- yeah, I think we need gapplyCollect() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r66721347 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -381,6 +385,50 @@ class RelationalGroupedDataset protected[sql]( def pivot(pivotColumn: String, values: java.util.List[Any]): RelationalGroupedDataset = { pivot(pivotColumn, values.asScala) } + + /** + * Applies the given serialized R function `func` to each group of data. For each unique group, + * the function will be passed the group key and an iterator that contains all of the elements in + * the group. The function can return an iterator containing elements of an arbitrary type which + * will be returned as a new [[DataFrame]]. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [[Dataset]]. If an application intends to perform an aggregation over each + * key, it is best to use the reduce function or an + * [[org.apache.spark.sql.expressions#Aggregator Aggregator]]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling `toList`) unless they are sure that this is possible given the memory + * constraints of their cluster. + * + * @since 2.0.0 + */ + private[sql] def flatMapGroupsInR( + f: Array[Byte], + packageNames: Array[Byte], + broadcastVars: Array[Object], + outputSchema: StructType): DataFrame = { + val broadcastVarObj = broadcastVars.map(_.asInstanceOf[Broadcast[Object]]) + val groupingNamedExpressions = groupingExprs.map(alias) + val groupingCols = groupingNamedExpressions.map(Column(_)) + val groupingDataFrame = df.select(groupingCols : _*) + val groupingAttributes = groupingNamedExpressions.map(_.toAttribute) + val realOutputSchema = if (outputSchema == null) SERIALIZED_R_DATA_SCHEMA else outputSchema --- End diff -- @NarineK, you can add a helper function like https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala#L143, it will pass in the schema for serialized R data when user-specified schema is NULL. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13394: [SPARK-15490][R][DOC] SparkR 2.0 QA: New R APIs a...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/13394#discussion_r66019351 --- Diff: R/pkg/R/functions.R --- @@ -249,6 +249,10 @@ col <- function(x) { #' #' Returns a Column based on the given column name. #' +#' Though scala functions has "col" function, we don't expose it in SparkR --- End diff -- No. "col" is not exposed in SparkR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/12836 I guess the byte array of the serialized R function is dumped. Let me find which commit caused this. I guess something like overriding toString may solve this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user sun-rui commented on the issue: https://github.com/apache/spark/pull/12836 @NarineK, thanks for hard work. Left some comments for you. @shivaram, do we still have time window for this to be in 2.0? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65563636 --- Diff: R/pkg/inst/worker/worker.R --- @@ -84,68 +136,51 @@ broadcastElap <- elapsedSecs() # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) -isDataFrame <- as.logical(SparkR:::readInt(inputCon)) +# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode +mode <- SparkR:::readInt(inputCon) -# If isDataFrame, then read column names -if (isDataFrame) { +if (mode > 0) { colNames <- SparkR:::readObject(inputCon) + if (mode == 2) { --- End diff -- grouping column names are not necessary --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65563441 --- Diff: R/pkg/inst/worker/worker.R --- @@ -27,6 +27,58 @@ elapsedSecs <- function() { proc.time()[3] } +computeHelper <- function(mode, partition, serializer, deserializer, key, + colNames, computeFunc, outputCon, inputData) { --- End diff -- coding style: identation aligment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65563298 --- Diff: R/pkg/R/DataFrame.R --- @@ -1266,6 +1266,83 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Group the SparkDataFrame using the specified columns and apply the R function to each +#' group. The group is defined by input grouping columns. --- End diff -- the second sentence is not necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org