[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18320


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123722811
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
+# Exit code that children send to the parent to indicate they exited.
+exitCode <- 1
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of children sent data to the parent are returned 
below. The children
+  # send a custom exit code to the parent after being exited and the 
parent tries
+  # to terminate them only if they sent the exit code.
+  children <- parallel:::selectChildren(timeout = 0)
+
+  if (is.integer(children)) {
+lapply(children, function(child) {
+  # This data should be raw bytes if any data was sent from this child.
+  # Otherwise, this returns the PID.
+  data <- parallel:::readChild(child)
+  if (is.raw(data)) {
+# This checks if the data from this child is the exit code that 
indicates an exited child.
+if (unserialize(data) == exitCode) {
+  # If so, we terminate this child.
+  tools::pskill(child, tools::SIGUSR1)
+}
+  }
+})
+  } else if (is.null(children)) {
+# If it is NULL, there are no such children. Waits indefinitely for a 
socket connecion.
+selectTimeout <- NULL
--- End diff --

Definitely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123719067
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
+# Exit code that children send to the parent to indicate they exited.
+exitCode <- 1
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of children sent data to the parent are returned 
below. The children
+  # send a custom exit code to the parent after being exited and the 
parent tries
+  # to terminate them only if they sent the exit code.
+  children <- parallel:::selectChildren(timeout = 0)
+
+  if (is.integer(children)) {
+lapply(children, function(child) {
+  # This data should be raw bytes if any data was sent from this child.
+  # Otherwise, this returns the PID.
+  data <- parallel:::readChild(child)
+  if (is.raw(data)) {
+# This checks if the data from this child is the exit code that 
indicates an exited child.
+if (unserialize(data) == exitCode) {
+  # If so, we terminate this child.
+  tools::pskill(child, tools::SIGUSR1)
+}
+  }
+})
+  } else if (is.null(children)) {
+# If it is NULL, there are no such children. Waits indefinitely for a 
socket connecion.
+selectTimeout <- NULL
--- End diff --

nit: `If it is NULL, there are no such children.` -> `If it is NULL, there 
are no any children.`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123718675
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
+# Exit code that children send to the parent to indicate they exited.
+exitCode <- 1
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of children sent data to the parent are returned 
below. The children
+  # send a custom exit code to the parent after being exited and the 
parent tries
+  # to terminate them only if they sent the exit code.
+  children <- parallel:::selectChildren(timeout = 0)
+
+  if (is.integer(children)) {
+lapply(children, function(child) {
+  # This data should be raw bytes if any data was sent from this child.
+  # Otherwise, this returns the PID.
+  data <- parallel:::readChild(child)
+  if (is.raw(data)) {
+# This checks if the data from this child is the exit code that 
indicates an exited child.
+if (unserialize(data) == exitCode) {
+  # If so, we terminate this child.
+  tools::pskill(child, tools::SIGUSR1)
+}
+  }
+})
+  } else if (is.null(children)) {
+# If it is NULL, there are no such children. Waits indefinitely for a 
socket connecion.
+selectTimeout <- NULL
--- End diff --

Oh, cool. LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123717458
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
+# Exit code that children send to the parent to indicate they exited.
+exitCode <- 1
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of children sent data to the parent are returned 
below. The children
+  # send a custom exit code to the parent after being exited and the 
parent tries
+  # to terminate them only if they sent the exit code.
+  children <- parallel:::selectChildren(timeout = 0)
+
+  if (is.integer(children)) {
+lapply(children, function(child) {
+  # This data should be raw bytes if any data was sent from this child.
+  # Otherwise, this returns the PID.
+  data <- parallel:::readChild(child)
+  if (is.raw(data)) {
+# This checks if the data from this child is the exit code that 
indicates an exited child.
+if (unserialize(data) == exitCode) {
+  # If so, we terminate this child.
+  tools::pskill(child, tools::SIGUSR1)
+}
+  }
+})
+  } else if (is.null(children)) {
+# If it is NULL, there are no such children. Waits indefinitely for a 
socket connecion.
+selectTimeout <- NULL
--- End diff --

I ran several tests for this 
https://github.com/apache/spark/pull/18320#discussion_r122860310 and 
https://github.com/apache/spark/pull/18320#discussion_r123675088

```
vi tmp.R
```

copy and paste

```R
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
  Sys.sleep(3L)
  parallel:::mcexit(0L)
}

print("TRUE - the timeout was reached")
print("parallel:::selectChildren(timeout = 0):")
print(parallel:::selectChildren(timeout = 0))
```

and ...

```
Rscript tmp.R
```

If data is not available, this should wait as much as `timeout` up to my 
knowledge rather than returning `NULL`.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123717120
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
+# Exit code that children send to the parent to indicate they exited.
+exitCode <- 1
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of children sent data to the parent are returned 
below. The children
+  # send a custom exit code to the parent after being exited and the 
parent tries
+  # to terminate them only if they sent the exit code.
+  children <- parallel:::selectChildren(timeout = 0)
+
+  if (is.integer(children)) {
+lapply(children, function(child) {
+  # This data should be raw bytes if any data was sent from this child.
+  # Otherwise, this returns the PID.
+  data <- parallel:::readChild(child)
+  if (is.raw(data)) {
+# This checks if the data from this child is the exit code that 
indicates an exited child.
+if (unserialize(data) == exitCode) {
+  # If so, we terminate this child.
+  tools::pskill(child, tools::SIGUSR1)
+}
+  }
+})
+  } else if (is.null(children)) {
+# If it is NULL, there are no such children. Waits indefinitely for a 
socket connecion.
+selectTimeout <- NULL
--- End diff --

I have this question because you have a comment:

Only the process IDs of children sent data to the parent are returned 
below.

So I am not sure if there are no children with data sent to the parent, the 
returned value is NULL or other?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123716354
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
+# Exit code that children send to the parent to indicate they exited.
+exitCode <- 1
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of children sent data to the parent are returned 
below. The children
+  # send a custom exit code to the parent after being exited and the 
parent tries
+  # to terminate them only if they sent the exit code.
+  children <- parallel:::selectChildren(timeout = 0)
+
+  if (is.integer(children)) {
+lapply(children, function(child) {
+  # This data should be raw bytes if any data was sent from this child.
+  # Otherwise, this returns the PID.
+  data <- parallel:::readChild(child)
+  if (is.raw(data)) {
+# This checks if the data from this child is the exit code that 
indicates an exited child.
+if (unserialize(data) == exitCode) {
+  # If so, we terminate this child.
+  tools::pskill(child, tools::SIGUSR1)
+}
+  }
+})
+  } else if (is.null(children)) {
+# If it is NULL, there are no such children. Waits indefinitely for a 
socket connecion.
+selectTimeout <- NULL
--- End diff --

It returns `NULL` if there are no children or there are no children that 
have data available?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123715868
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
+# Exit code that children send to the parent to indicate they exited.
+exitCode <- 1
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of children sent data to the parent are returned 
below. The children
+  # send a custom exit code to the parent after being exited and the 
parent tries
+  # to terminate them only if they sent the exit code.
+  children <- parallel:::selectChildren(timeout = 0)
+
+  if (is.integer(children)) {
+lapply(children, function(child) {
+  # This data should be raw bytes if any data was sent from this child.
+  # Otherwise, this returns the PID.
+  data <- parallel:::readChild(child)
+  if (is.raw(data)) {
+# This checks if the data from this child is the exit code that 
indicates an exited child.
+if (unserialize(data) == exitCode) {
+  # If so, we terminate this child.
+  tools::pskill(child, tools::SIGUSR1)
+}
+  }
+})
+  } else if (is.null(children)) {
+# If it is NULL, there are no such children. Waits indefinitely for a 
socket connecion.
+selectTimeout <- NULL
--- End diff --

It becomes `NULL` if there are no children. It returns `TRUE` if there are 
children here. So, we always turn back to wait indefinitely only if there are 
no children at all.

https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/children.html

> TRUE is the timeout was reached, FALSE if an error occurred (e.g., if the 
master process was interrupted) or an integer vector of process IDs with 
children that have data available, or NULL if there are no children.

Would this answer your question (or did I maybe misunderstand) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123714768
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
+# Exit code that children send to the parent to indicate they exited.
+exitCode <- 1
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of children sent data to the parent are returned 
below. The children
+  # send a custom exit code to the parent after being exited and the 
parent tries
+  # to terminate them only if they sent the exit code.
+  children <- parallel:::selectChildren(timeout = 0)
+
+  if (is.integer(children)) {
+lapply(children, function(child) {
+  # This data should be raw bytes if any data was sent from this child.
+  # Otherwise, this returns the PID.
+  data <- parallel:::readChild(child)
+  if (is.raw(data)) {
+# This checks if the data from this child is the exit code that 
indicates an exited child.
+if (unserialize(data) == exitCode) {
+  # If so, we terminate this child.
+  tools::pskill(child, tools::SIGUSR1)
+}
+  }
+})
+  } else if (is.null(children)) {
+# If it is NULL, there are no such children. Waits indefinitely for a 
socket connecion.
+selectTimeout <- NULL
--- End diff --

One path described above is:

>  1. Every second if any socket connection is not available and if there 
are child workers
 running.

So after forking succeeded and we want to do check per second, but after 
one second and if no children sent data, we turn back immediately to wait 
indefinitely. Doesn't it mean we don't actually do the logic every second but 
just for the first second?






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123701851
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,51 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  children <- parallel:::selectChildren(timeout = 0)
+  if (is.integer(children)) {
+# If it is PIDs, there are workers exited but not terminated. Attempts 
to terminate them
+# by setting SIGUSR1.
+lapply(children, function(child) {
--- End diff --

I also tested

```r
kill <- function(children) {
  lapply(children, function(child) {
data <- parallel:::readChild(child)
if (is.raw(data)) {
  if (unserialize(data) == -1) {
print(paste("child PID:", child, "and parent will kill given:", 
unserialize(data)))
tools::pskill(child, tools::SIGUSR1)
  } else {
print(paste("child PID:", child, "and sent a data:", 
unserialize(data)))
  }
} else {
  print(paste("child PID:", child, "and sent a PID:", data))
}
  })
}

for(i in 0:1000) {
  p <- parallel:::mcfork()
  if (inherits(p, "masterProcess")) {
# Sys.sleep(3L)

# Send no data
parallel:::mcexit(0L)
  } else {
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
  ## Sys.sleep(3L)

  # Send custom data
  parallel:::sendMaster("arbitrary")
  parallel:::sendMaster(123)
  parallel:::mcexit(0L)
} else {
  p <- parallel:::mcfork()
  if (inherits(p, "masterProcess")) {
### Sys.sleep(3L)

# Send explicit singal.
parallel:::mcexit(0L, send = -1)
  } else {
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
  ### Sys.sleep(10L)

  # Check if `readChild` waits
  parallel:::mcexit(0L, send = -1)
}
  }
}
  }

  c <- parallel:::selectChildren(timeout = 0)
  if (!is.null(c)) {
print(paste("Killing", c))
invisible(kill(parallel:::selectChildren(timeout = 0)))
  }
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123700234
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,51 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  children <- parallel:::selectChildren(timeout = 0)
+  if (is.integer(children)) {
+# If it is PIDs, there are workers exited but not terminated. Attempts 
to terminate them
+# by setting SIGUSR1.
+lapply(children, function(child) {
--- End diff --

With the change above, it printed:

```
[1] "Wait for 4 seconds to test the last child ..."
[1] "child PID: 86866 and parent will kill given: -1"
[1] "child PID: 86865 and parent will kill given: -1"
[1] "child PID: 86864 and sent a data: arbitrary"
[1] "child PID: 86863 and sent a PID: 86863"
[1] "Wait for 7 seconds more to test the last child ..."
[1] "child PID: 86864 and sent a data: 123"
```

It looks correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123688218
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,51 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  children <- parallel:::selectChildren(timeout = 0)
+  if (is.integer(children)) {
+# If it is PIDs, there are workers exited but not terminated. Attempts 
to terminate them
+# by setting SIGUSR1.
+lapply(children, function(child) {
--- End diff --

Definitely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123681647
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
--- End diff --

right, but in the odd case that JVM died before making the socket 
connection, this R process will wait forever?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123686953
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,54 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
+# Exit code that children send to the parent to indicate they exited.
+exitCode <- 1
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  children <- parallel:::selectChildren(timeout = 0)
+  if (is.integer(children)) {
+# If it is PIDs, there are workers exited but not terminated. Attempts 
to terminate them
+# by setting SIGUSR1.
--- End diff --

nit: I think the comment on L59 and L63 can be updated to match more 
closely to the new behavior - that we are checking proper exit code is sent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123686706
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,54 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
+# Exit code that children send to the parent to indicate they exited.
+exitCode <- 1
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  children <- parallel:::selectChildren(timeout = 0)
+  if (is.integer(children)) {
+# If it is PIDs, there are workers exited but not terminated. Attempts 
to terminate them
+# by setting SIGUSR1.
+lapply(children, function(child) {
+  # This data should be raw bytes if any data was sent from this child.
+  # Otherwise, this returns the PID.
+  data <- parallel:::readChild(child)
+  if (is.raw(data)) {
+# This checks if the data from this child is -1 that indecides an 
exited child.
--- End diff --

`-1` to exit code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123687096
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,54 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
+# Exit code that children send to the parent to indicate they exited.
+exitCode <- 1
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  children <- parallel:::selectChildren(timeout = 0)
+  if (is.integer(children)) {
+# If it is PIDs, there are workers exited but not terminated. Attempts 
to terminate them
+# by setting SIGUSR1.
+lapply(children, function(child) {
+  # This data should be raw bytes if any data was sent from this child.
+  # Otherwise, this returns the PID.
+  data <- parallel:::readChild(child)
+  if (is.raw(data)) {
+# This checks if the data from this child is -1 that indecides an 
exited child.
--- End diff --

`indecides` -> `indicates`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-23 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123682838
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,51 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  children <- parallel:::selectChildren(timeout = 0)
+  if (is.integer(children)) {
+# If it is PIDs, there are workers exited but not terminated. Attempts 
to terminate them
+# by setting SIGUSR1.
+lapply(children, function(child) {
--- End diff --

cool

could you also test without these sleep (trying to see if there's any race 
condition)

```
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
  # Sys.sleep(3L)

  # Send no data
  parallel:::mcexit(0L)
} else {
  p <- parallel:::mcfork()
  if (inherits(p, "masterProcess")) {
## Sys.sleep(3L)

# Send custom data
parallel:::sendMaster("arbitrary")
parallel:::sendMaster(123)
parallel:::mcexit(0L)
  } else {
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
  ### Sys.sleep(3L)

  # Send explicit singal.
  parallel:::mcexit(0L, send = -1)
} else {
  p <- parallel:::mcfork()
  if (inherits(p, "masterProcess")) {
### Sys.sleep(10L)

# Check if `readChild` waits
parallel:::mcexit(0L, send = -1)
  }
}
  }
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123675152
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,51 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  children <- parallel:::selectChildren(timeout = 0)
+  if (is.integer(children)) {
+# If it is PIDs, there are workers exited but not terminated. Attempts 
to terminate them
+# by setting SIGUSR1.
+lapply(children, function(child) {
--- End diff --

Also, I referred the code bit in 
https://github.com/mllg/batchtools/blob/master/R/clusterFunctionsMulticore.R#L8-L18.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123675088
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,51 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  children <- parallel:::selectChildren(timeout = 0)
+  if (is.integer(children)) {
+# If it is PIDs, there are workers exited but not terminated. Attempts 
to terminate them
+# by setting SIGUSR1.
+lapply(children, function(child) {
--- End diff --

I tested this logics as below:

```
vi tmp.R
```

copied and pasted

```r
kill <- function(children) {
  lapply(children, function(child) {
data <- parallel:::readChild(child)
if (is.raw(data)) {
  if (unserialize(data) == -1) {
print(paste("child PID:", child, "and parent will kill given:", 
unserialize(data)))
tools::pskill(child, tools::SIGUSR1)
  } else {
print(paste("child PID:", child, "and sent a data:", 
unserialize(data)))
  }
} else {
  print(paste("child PID:", child, "and sent a PID:", data))
}
  })
}

p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
  Sys.sleep(3L)

  # Send no data
  parallel:::mcexit(0L)
} else {
  p <- parallel:::mcfork()
  if (inherits(p, "masterProcess")) {
Sys.sleep(3L)

# Send custom data
parallel:::sendMaster("arbitrary")
parallel:::sendMaster(123)
parallel:::mcexit(0L)
  } else {
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
  Sys.sleep(3L)

  # Send explicit singal.
  parallel:::mcexit(0L, send = -1)
} else {
  p <- parallel:::mcfork()
  if (inherits(p, "masterProcess")) {
Sys.sleep(10L)

# Check if `readChild` waits
parallel:::mcexit(0L, send = -1)
  }
}
  }
}

print("Wait for 4 seconds to test the last child ...")
Sys.sleep(4L)
invisible(kill(parallel:::selectChildren(timeout = 0)))

print("Wait for 7 seconds more to test the last child ...")
Sys.sleep(7L)
invisible(kill(parallel:::selectChildren(timeout = 0)))
```

```
[1] "Wait for 4 seconds to test the last child ..."
[1] "child PID: 7405 and parent will kill given: -1"
[1] "child PID: 7404 and sent a data: arbitrary"
[1] "child PID: 7403 and sent a PID: 7403"
[1] "Wait for 7 seconds more to test the last child ..."
[1] "child PID: 7406 and parent will kill given: -1"
[1] "child PID: 7404 and sent a data: 123"
```

and ran

```
Rscript tmp.R
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123633470
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  children <- parallel:::selectChildren(timeout = 0)
+  if (is.integer(children)) {
+# If it is PIDs, there are workers exited but not terminated. Attempts 
to terminate them
--- End diff --

Sure, let me take a look and be back soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-22 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123564501
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
--- End diff --

Its less of waiting indefinitely but its waiting for input from the JVM 
socket -- I think this is the right behavior by design


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-22 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123569536
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  children <- parallel:::selectChildren(timeout = 0)
+  if (is.integer(children)) {
+# If it is PIDs, there are workers exited but not terminated. Attempts 
to terminate them
--- End diff --

Hmm @felixcheung this is an good point - The thing is I think what 
@HyukjinKwon says might be valid given the current code path -- i.e. the child 
processes do not write anything back to the master process until exit. However 
that seems pretty fragile going forward ? What if we explicitly used 
`sendMaster` to send back an exited signal to the master ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123443750
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -47,9 +79,11 @@ while (TRUE) {
   close(inputCon)
--- End diff --

Added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123440068
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  children <- parallel:::selectChildren(timeout = 0)
+  if (is.integer(children)) {
+# If it is PIDs, there are workers exited but not terminated. Attempts 
to terminate them
--- End diff --

I think [this 
line](https://github.com/s-u/multicore/blob/e9d9bf21e6cf08e24cfe54d762379b4fa923765b/src/fork.c#L440)
 checks if any data from children is available via file descriptors (pipes I 
believe). 

The child do not send any data back to the parent (via 
`parallel:::sendMaster` API and etc.). So, `FD_ISSET` will only be true on the 
close of the pipe (as this is a event to the pipe), which I guess happens when 
`exit()` is called properly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-21 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123417186
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -47,9 +79,11 @@ while (TRUE) {
   close(inputCon)
--- End diff --

please add comment here to say "# reach here because this is a child 
process"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-21 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123416154
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
--- End diff --

hmm, this is pre-existing behavior, but generally I think waiting 
indefinitely for anything would be rather dangerous. probably would be good to 
follow up separately


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-21 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123416771
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  children <- parallel:::selectChildren(timeout = 0)
+  if (is.integer(children)) {
+# If it is PIDs, there are workers exited but not terminated. Attempts 
to terminate them
--- End diff --

If I understanding correctly, children (from `parallel:::selectChildren()`) 
as integer only indicates the list of fork/child process. But it does not 
indicate the child process "exited"?

When we are in a loop checking every second (ie. selectTimeout == 1), it 
sounds to me like socketSelect could return in one of the following ways:
- socket available for reading (ready == TRUE)
- socket not available for reading, it's dead or exiting (ready == FALSE)
- socket not available for reading, but it's not ready to connect because 
say things are running slow or the host is busy etc, and it has effectively 
been just timed out since selectTimeout (ready == FALSE)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-21 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r123419616
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the 
children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if 
there are child workers
+  # running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children 
every second if
+  # any worker is running or right before launching other worker children 
from the following
+  # new socket connection.
+
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  children <- parallel:::selectChildren(timeout = 0)
+  if (is.integer(children)) {
+# If it is PIDs, there are workers exited but not terminated. Attempts 
to terminate them
--- End diff --

right, I see your reference here 
https://github.com/apache/spark/pull/18320#discussion_r122639738
but I'm not 100% getting it when looking at the source code 
https://github.com/s-u/multicore/blob/e9d9bf21e6cf08e24cfe54d762379b4fa923765b/src/fork.c#L361


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-20 Thread HyukjinKwon
GitHub user HyukjinKwon reopened a pull request:

https://github.com/apache/spark/pull/18320

[SPARK-21093][R] Terminate R's worker processes in the parent of R's daemon 
to prevent a leak

## What changes were proposed in this pull request?

`mcfork` in R looks opening a pipe ahead but the existing logic does not 
properly close it when it is executed hot. This leads to the failure of more 
forking due to the limit for number of files open.

This hot execution looks particularly for `gapply`/`gapplyCollect`. For 
unknown reason, this happens more easily in CentOS and could be reproduced in 
Mac too.

All the details are described in 
https://issues.apache.org/jira/browse/SPARK-21093

This PR proposes simply to terminate R's worker processes in the parent of 
R's daemon to prevent a leak.

## How was this patch tested?

I ran the codes below on both CentOS and Mac with that configuration 
disabled/enabled.

```r
df <- createDataFrame(list(list(1L, 1, "1", 0.1)), c("a", "b", "c", "d"))
collect(gapply(df, "a", function(key, x) { x }, schema(df)))
collect(gapply(df, "a", function(key, x) { x }, schema(df)))
...  # 30 times
```

Also, now it passes R tests on CentOS as below:

```
SparkSQL functions: Spark package found in SPARK_HOME: .../spark

..

..

..

..

..


```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HyukjinKwon/spark SPARK-21093

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18320.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18320


commit 6e57ed2931afc5aec8c4b4bef72c157abcb68c46
Author: hyukjinkwon 
Date:   2017-06-16T02:37:53Z

Terminates forked processed in the parent process

commit 4eadafe3f009b1c70956c08c99302c1da34db6d4
Author: hyukjinkwon 
Date:   2017-06-17T09:21:37Z

Fix typo (renaming missed)

commit 18b3ee9a66df40658074511558f0cd36fc102df7
Author: hyukjinkwon 
Date:   2017-06-17T10:54:57Z

Rename x to c in lapply

commit 72ab1f2e8cafa1d5249a09279825444e3ca38b39
Author: hyukjinkwon 
Date:   2017-06-19T12:21:08Z

Update comments to describe the behaviour change

commit 6cba54c243123d25f479363d3dfd7eb92bb25599
Author: hyukjinkwon 
Date:   2017-06-20T01:02:26Z

Do not check every second if there is no worker running

commit 4954008884ff02a9eae9ea50586e86e8923fc593
Author: hyukjinkwon 
Date:   2017-06-20T09:04:03Z

Address comment

commit f3f57e46868e66b8f50268910c1eff494638059d
Author: hyukjinkwon 
Date:   2017-06-20T09:30:56Z

Fix comments

commit 04bb37a6d8d4387365f6d46cb8e2c6fbe912351d
Author: hyukjinkwon 
Date:   2017-06-20T09:32:29Z

Fix comments

commit d6f0ff275abd3b7641210427eea955c9f0ea8d86
Author: hyukjinkwon 
Date:   2017-06-20T09:39:21Z

Add more comments

commit 8b48274dc565dc5c6722e983c55494b0067bda72
Author: Hyukjin Kwon 
Date:   2017-06-20T10:00:05Z

Fix a typo




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-20 Thread HyukjinKwon
Github user HyukjinKwon closed the pull request at:

https://github.com/apache/spark/pull/18320


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122879650
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,42 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+selectTimeout <- function() {
+  if (is.null(parallel:::selectChildren(timeout = 0))) {
+# Wait a socket connection indefinitely if there are no workers 
running.
+NULL
--- End diff --

Sure, let me check this out soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122875289
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,42 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+selectTimeout <- function() {
+  if (is.null(parallel:::selectChildren(timeout = 0))) {
+# Wait a socket connection indefinitely if there are no workers 
running.
+NULL
--- End diff --

Nice - this is in line with what I was thinking. One thing: Can we maintain 
some state in our own code to avoid calling `select` twice ? What I mean is if 
`finishedChildren` was `NULL` (based on [1]) then we set a flag saying `timeout 
<- NULL` -- next time we fork some children we set this back to `1`. Will that 
work or am I missing something ?

[1] 
https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/children.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122861395
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,42 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+selectTimeout <- function() {
+  if (is.null(parallel:::selectChildren(timeout = 0))) {
--- End diff --

Multiple children tests can be done as below:

copy and paste this instead

```r
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
  Sys.sleep(3L)
  parallel:::mcexit(0L)
} else {
  p <- parallel:::mcfork()
  if (inherits(p, "masterProcess")) {
Sys.sleep(3L)
parallel:::mcexit(0L)
  }
}

print("TRUE - the timeout was reached")
print("parallel:::selectChildren(timeout = 0):")
print(parallel:::selectChildren(timeout = 0))
Sys.sleep(4L)

print("PID - child is exited (but not terminated)")
print("parallel:::selectChildren(timeout = 0):")
print(parallel:::selectChildren(timeout = 0))

invisible(lapply(parallel:::selectChildren(timeout = 0), function(x) { 
tools::pskill(x, tools::SIGUSR1) }))
Sys.sleep(1L)

print("NULL - if there are no children")
print("parallel:::selectChildren(timeout = 0):")
print(parallel:::selectChildren(timeout = 0))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122860937
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,42 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+selectTimeout <- function() {
+  if (is.null(parallel:::selectChildren(timeout = 0))) {
--- End diff --

And ... just in case, these are can be checked via both commands as below:

```
watch -n 0.01 "ps -fe | grep /exec/R"
```

checks forked processes.

```
watch -n 0.01 "lsof -c R | wc -l"
```

check open files (and file descriptors & pipes).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122860370
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,42 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+selectTimeout <- function() {
+  if (is.null(parallel:::selectChildren(timeout = 0))) {
+# Wait a socket connection indefinitely if there are no workers 
running.
+NULL
--- End diff --

https://stat.ethz.ch/R-manual/R-devel/library/base/html/socketSelect.html

> NULL means wait indefinitely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122860310
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -30,8 +30,42 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+selectTimeout <- function() {
+  if (is.null(parallel:::selectChildren(timeout = 0))) {
--- End diff --

This can be tested as below:

```r
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
  Sys.sleep(3L)
  parallel:::mcexit(0L)
}

print("TRUE - the timeout was reached")
print("parallel:::selectChildren(timeout = 0):")
print(parallel:::selectChildren(timeout = 0))
Sys.sleep(4L)

print("PID - child is exited (but not terminated)")
print("parallel:::selectChildren(timeout = 0):")
print(parallel:::selectChildren(timeout = 0))

invisible(lapply(parallel:::selectChildren(timeout = 0), function(x) { 
tools::pskill(x, tools::SIGUSR1) }))
Sys.sleep(1L)

print("NULL - if there are no children")
print("parallel:::selectChildren(timeout = 0):")
print(parallel:::selectChildren(timeout = 0))
```

Per the documentation - 
https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/children.html

> selectChildren returns `TRUE` is the timeout was reached, `FALSE` if an 
error occurred (e.g., if the master process was interrupted) or an integer 
vector of process IDs with children that have data available, or` NULL` if 
there are no children.

I also manually built this and checked if it really wait indefinitely with 
printing out.

via running 

```r
df <- createDataFrame(list(list(1L, 1, "1", 0.1)), c("a", "b", "c", "d"))
collect(gapply(df, "a", function(key, x) { x }, schema(df)))
collect(gapply(df, "a", function(key, x) { x }, schema(df)))
...  # 30 times
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122855545
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,15 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
--- End diff --

BTW, the old behaviour was simply to hang and wait there indefinitely.

https://stat.ethz.ch/R-manual/R-devel/library/base/html/socketSelect.html

> numeric or NULL. Time in seconds to wait for a socket to become 
available; NULL means wait indefinitely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122848957
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,15 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
--- End diff --

Yea, I got your point and was taking a look. I think probably feasible via 
`parallel:::children`. I will give a shot to deal with this here and be back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122847863
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,15 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
--- End diff --

To make my point a bit more clear, if there are no workers running, we can 
block on this `select` and dont need the one second timeout in that case ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122843971
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,30 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it sends a signal to terminate the children 
in the parent.
+  #
+  #   1. Every second if any socket connection is not available.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent sends the signal to children every second 
or right before
+  # launching other worker children from the following new socket 
connection.
+  #
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  finishedChildren <- parallel:::selectChildren(timeout = 0)
--- End diff --

This does not block at all. I tested this - 
https://github.com/apache/spark/pull/18320#discussion_r122605437 for sure. Let 
me double check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122804718
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,30 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it sends a signal to terminate the children 
in the parent.
+  #
+  #   1. Every second if any socket connection is not available.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent sends the signal to children every second 
or right before
+  # launching other worker children from the following new socket 
connection.
+  #
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  finishedChildren <- parallel:::selectChildren(timeout = 0)
--- End diff --

Does the timeout of `0` not wait at all or does it block until it gets all 
the results ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread shivaram
Github user shivaram commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122803981
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,15 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
--- End diff --

@HyukjinKwon Do we know what the old behavior was doing ? I'm thinking if 
we would spend more time in the loop below now and be less responsive to the 
`inputCon`. One thing -- Can we control the timeout based on whether there are 
any children running or not ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122695789
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,30 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
+
+  # Note that the children should be terminated in the parent. If each 
child terminates
+  # itself, it appears that the resource is not released properly, that 
causes an unexpected
+  # termination of this daemon due to, for example, running out of file 
descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to 
retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to 
terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it sends a signal to terminate the children 
in the parent.
+  #
+  #   1. Every second if any socket connection is not available.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent sends the signal to children every second 
or right before
+  # launching other worker children from the following new socket 
connection.
+  #
+  # Only the process IDs of exited children are returned and the 
termination is attempted below.
+  finishedChildren <- parallel:::selectChildren(timeout = 0)
--- End diff --

This is 0 by default but I added to prevent conversion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122687051
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,15 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
+
+  # Terminate R workers in the parent process.
+  finishedChildren <- parallel:::selectChildren()
--- End diff --

@felixcheung, I tested with the change below:

```diff
 port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
+Sys.sleep(5L)
 inputCon <- socketConnection(
 port = port, blocking = TRUE, open = "rb", timeout = connectionTimeout)
 outputCon <- socketConnection(
```

It looks fine. Does this deal with your concern?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122639738
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,15 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
+
+  # Terminate R workers in the parent process.
+  finishedChildren <- parallel:::selectChildren()
--- End diff --

Definitely for comments.

Maybe I missed your point. Children will only return their PID on exit and 
`parallel:::selectChildren()` will only return children PIDs that finished they 
work and called `parallel:::mcexit(0L)` (related test was done 
https://github.com/apache/spark/pull/18320#discussion_r122605437) up to my 
knowledge. So, even if connecting to JVM is delayed in `worker.R` and 
`RRunner.scala`, it won't matter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-19 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122630135
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,15 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
+
+  # Terminate R workers in the parent process.
+  finishedChildren <- parallel:::selectChildren()
--- End diff --

ah got it.
could you add some comment in the code to document the behavior?

also I wonder if delay in connecting back to the JVM will now cause the 
code to abort prematurely?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122609234
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,15 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
+
+  # Terminate R workers in the parent process.
+  finishedChildren <- parallel:::selectChildren()
--- End diff --

The related test was performed here - 
https://github.com/apache/spark/pull/18320#discussion_r122605437


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122609177
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,15 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
+
+  # Terminate R workers in the parent process.
+  finishedChildren <- parallel:::selectChildren()
--- End diff --

Ah, yes. It looks confusing. Probably, let me add more comments around 
here. I intended to do ...

A: When data is arrived in `daemon.R`:
 - `ready` is `TRUE` and terminates children, `worker.R`, if they finished 
their works.
 - Launch other `worker.R` children.

B: When data is not arrived in `daemon.R`:
 - `ready` is `FALSE` and terminates children, `worker.R`, if they finished 
their works, every second.

If we do this within `if (ready) {`, the child processes will remain until 
A case happens.

Forked children do some jobs and if the parent checks if any is finished 
right after forking by `parallel:::selectChildren()` will probably not return 
the finished children PIDs.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-18 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122608540
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,15 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
+
+  # Terminate R workers in the parent process.
+  finishedChildren <- parallel:::selectChildren()
--- End diff --

why not do this under `if (ready) {` similar to before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122606765
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -47,8 +55,6 @@ while (TRUE) {
   close(inputCon)
   Sys.setenv(SPARKR_WORKER_PORT = port)
   try(source(script))
-  # Set SIGUSR1 so that child can exit
-  tools::pskill(Sys.getpid(), tools::SIGUSR1)
   parallel:::mcexit(0L)
--- End diff --

BTW, up to my knowledge, this should work alone, at least in C (< I didn't 
test). Even if it was a bug in R and is fixed in the future, I guess the 
current logics would still work with the current status.

```R
arbitrary <- 
tools::pskill(arbitrary, tools::SIGUSR1)
```

https://stat.ethz.ch/R-manual/R-devel/library/tools/html/pskill.html

> it silently ignores invalid values of its arguments, including zero or 
negative pids.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122606530
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -47,8 +55,6 @@ while (TRUE) {
   close(inputCon)
   Sys.setenv(SPARKR_WORKER_PORT = port)
   try(source(script))
-  # Set SIGUSR1 so that child can exit
-  tools::pskill(Sys.getpid(), tools::SIGUSR1)
   parallel:::mcexit(0L)
--- End diff --

`mcexit` alone does not terminate the child.


With terminal A:

```R
R
```

With terminal B:

```bash
watch -n 0.01 "ps -fe | grep /bin/exec/R"
```

with terminal A:

```R
for(i in 0:100) {
  p <- parallel:::mcfork()
  if (inherits(p, "masterProcess")) {
parallel:::mcexit(0L)
  }
}
```




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122605437
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,15 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
+
+  # Terminate R workers in the parent process.
+  finishedChildren <- parallel:::selectChildren()
--- End diff --

This can be tested as below:

```bash
vi tmp.R
```

copy and paste

```R
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
  Sys.sleep(3L)
  parallel:::mcexit(0L)
}
print(parallel:::selectChildren())
Sys.sleep(4L)
print(parallel:::selectChildren())
```

This prints 

```bash
[1] TRUE
[1] 2505
```

For return values, please refer 
https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/children.html.

For testing resource leak, this can be tested as below:

```R
for(i in 0:10) {
  p <- parallel:::mcfork()
  if (inherits(p, "masterProcess")) {
parallel:::mcexit(0L)
  }
  
  exitChildren <- parallel:::selectChildren()
  if (is.integer(exitChildren)) {
lapply(exitChildren, function(x) { tools::pskill(x, tools::SIGUSR1) })
  }
}
```

The original code below:

```R
for(i in 0:10) {
  p <- parallel:::mcfork()
  if (inherits(p, "masterProcess")) {
tools::pskill(Sys.getpid(), tools::SIGUSR1)
parallel:::mcexit(0L)
  }
}
```

will probably complain in `mcfork` due to the lack of resources (e.g., ran 
out of file descriptors).

Actual number can be checked via `watch -n 0.01 "lsof -c R | wc -l"` in 
another terminal.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122605188
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,15 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
--- End diff --

The logic here can be tested as below:

With a terminal (terminal A) running the command as below:

```bash
nc -l 
```
with another terminal (terminal B) running the codes below in R shell.

```R
inputCon <- socketConnection(port = "", open = "rb", blocking = TRUE)  
print(socketSelect(list(inputCon), timeout = 1))
```

This prints `FALSE` as it does not have any data first.

In terminal A, if any character is typed first, this returns `TRUE` in 
terminal B with another execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-17 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122564740
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,15 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
+
+  # Terminate R workers in the parent process.
+  finishedChildren <- parallel:::selectChildren()
--- End diff --

`timeout` for `selectChildren` looks 0 by default. So, this does not look 
hanging here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...

2017-06-17 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18320#discussion_r122564686
  
--- Diff: R/pkg/inst/worker/daemon.R ---
@@ -31,7 +31,15 @@ inputCon <- socketConnection(
 port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = 1)
--- End diff --

So, I intended to terminates the children for every second _or_ before 
launching other workers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org