[GitHub] spark pull request: [SPARK-12871][SQL] Support to specify the opti...

2016-01-19 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/10805#discussion_r50206066
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala
 ---
@@ -107,3 +114,28 @@ private[csv] object ParseModes {
 true // We default to permissive is the mode string is not valid
   }
 }
+
+private[csv] object CSVCompressionCodecs {
+  private val shortCompressionCodecNames = Map(
+"bzip2" -> classOf[BZip2Codec].getName,
+"gzip" -> classOf[GzipCodec].getName,
+"lz4" -> classOf[Lz4Codec].getName,
+"snappy" -> classOf[SnappyCodec].getName)
+
+  /**
+   * Return the full version of the given codec class.
+   * If it is already a class name, just return it.
+   */
+  def getCodecClassName(name: String): String = {
+val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, 
name)
+val codecClassName = try {
+  // Validate the codec name
+  Utils.classForName(codecName)
+  Some(codecName)
+} catch {
+  case e: ClassNotFoundException => None
+}
+codecClassName.getOrElse(throw new IllegalArgumentException(s"Codec 
[$codecName] " +
--- End diff --

should just put this throw inside the catch block and not bother with the 
Option stuff


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12871][SQL] Support to specify the opti...

2016-01-19 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/10805#discussion_r50206079
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala
 ---
@@ -107,3 +114,28 @@ private[csv] object ParseModes {
 true // We default to permissive is the mode string is not valid
   }
 }
+
+private[csv] object CSVCompressionCodecs {
+  private val shortCompressionCodecNames = Map(
+"bzip2" -> classOf[BZip2Codec].getName,
+"gzip" -> classOf[GzipCodec].getName,
+"lz4" -> classOf[Lz4Codec].getName,
+"snappy" -> classOf[SnappyCodec].getName)
+
+  /**
+   * Return the full version of the given codec class.
+   * If it is already a class name, just return it.
+   */
+  def getCodecClassName(name: String): String = {
+val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, 
name)
+val codecClassName = try {
+  // Validate the codec name
+  Utils.classForName(codecName)
+  Some(codecName)
+} catch {
+  case e: ClassNotFoundException => None
+}
+codecClassName.getOrElse(throw new IllegalArgumentException(s"Codec 
[$codecName] " +
+  s"is not available. Available codecs are 
${shortCompressionCodecNames.keys.mkString(", ")}."))
--- End diff --

Probably change Available -> Known (since there may be more available)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9808] Remove spark.shuffle.consolidateF...

2015-08-10 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/8089#issuecomment-129673926
  
Heartbroken. Let's remove it.

Perhaps we can document its impending deprecation in 1.5.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8625] [Core] Propagate user exceptions ...

2015-07-30 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/7014#issuecomment-126415566
  
Looks good from my end.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9397] DataFrame should provide an API t...

2015-07-28 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/7717#issuecomment-125634257
  
Renamed to inputFiles and updated description.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9397] DataFrame should provide an API t...

2015-07-27 Thread aarondav
GitHub user aarondav opened a pull request:

https://github.com/apache/spark/pull/7717

[SPARK-9397] DataFrame should provide an API to find source data files if 
applicable

Certain applications would benefit from being able to inspect DataFrames 
that are straightforwardly produced by data sources that stem from files, and 
find out their source data. For example, one might want to display to a user 
the size of the data underlying a table, or to copy or mutate it.

This PR exposes a `sourcePaths` method on DataFrame which attempts to 
discover the source data in a best-effort manner, by inspecting 
HadoopFsRelations and JSONRelations.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aarondav/spark paths

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7717.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7717


commit 0acd3ad929f18706e05b5deba0e5acae8067c7a1
Author: Aaron Davidson aa...@databricks.com
Date:   2015-07-28T05:42:59Z

[SPARK-9397] DataFrame should provide an API to find source data files if 
applicable

Certain applications would benefit from being able to inspect DataFrames 
that are straightforwardly produced by data sources that stem from files, and 
find out their source data. For example, one might want to display to a user 
the size of the data underlying a table, or to copy or mutate it.

This PR exposes a `sourcePaths` method on DataFrame which attempts to 
discover the source data in a best-effort manner, by inspecting 
HadoopFsRelations and JSONRelations.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9397] DataFrame should provide an API t...

2015-07-27 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/7717#issuecomment-125458969
  
cc @marmbrus @rxin about exposing a new DataFrame API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8625] [Core] Propagate user exceptions ...

2015-07-22 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/7014#discussion_r35280503
  
--- Diff: core/src/test/scala/org/apache/spark/FailureSuite.scala ---
@@ -141,5 +141,73 @@ class FailureSuite extends SparkFunSuite with 
LocalSparkContext {
 FailureSuiteState.clear()
   }
 
+  // Run a 3-task map job in which task 1 always fails with a exception 
message that
+  // depends on the failure number, and check that we get the last failure.
+  test(last failure cause is sent back to driver) {
+sc = new SparkContext(local[1,2], test)
+val data = sc.makeRDD(1 to 3, 3).map { x =
+  FailureSuiteState.synchronized {
+FailureSuiteState.tasksRun += 1
+if (x == 3) {
+  FailureSuiteState.tasksFailed += 1
+  throw new UserException(oops,
+new IllegalArgumentException(failed= + 
FailureSuiteState.tasksFailed))
+}
+  }
+  x * x
+}
+val thrown = intercept[SparkException] {
+  data.collect()
+}
+FailureSuiteState.synchronized {
+  assert(FailureSuiteState.tasksRun === 4)
+}
+assert(thrown.getClass === classOf[SparkException])
+assert(thrown.getCause.getClass === classOf[UserException])
+assert(thrown.getCause.getMessage === oops)
+assert(thrown.getCause.getCause.getClass === 
classOf[IllegalArgumentException])
+assert(thrown.getCause.getCause.getMessage === failed=2)
+FailureSuiteState.clear()
+  }
+
+  test(failure cause stacktrace is sent back to driver if exception is 
not serializable) {
+sc = new SparkContext(local, test)
+val data = sc.makeRDD(1 to 3).map(x = { throw new 
NonSerializableUserException; (x,
+  x) }).groupByKey(3)
+val thrown = intercept[SparkException] {
+  data.collect()
+}
+assert(thrown.getClass === classOf[SparkException])
+assert(thrown.getCause === null)
+assert(thrown.getMessage.contains(NonSerializableUserException))
+FailureSuiteState.clear()
+  }
+
+  test(failure cause stacktrace is sent back to driver if exception is 
not deserializable) {
+sc = new SparkContext(local, test)
+val data = sc.makeRDD(1 to 3).map(x = { throw new 
NonDeserializableUserException; (x,
--- End diff --

nit: maybe just
```scala
val data = sc.makeRDD(1 to 3).foreach { _ =
  throw new NonDeserializationUserException
}.groupByKey(3)
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8625] [Core] Propagate user exceptions ...

2015-07-22 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/7014#discussion_r35280447
  
--- Diff: core/src/test/scala/org/apache/spark/FailureSuite.scala ---
@@ -141,5 +141,73 @@ class FailureSuite extends SparkFunSuite with 
LocalSparkContext {
 FailureSuiteState.clear()
   }
 
+  // Run a 3-task map job in which task 1 always fails with a exception 
message that
+  // depends on the failure number, and check that we get the last failure.
+  test(last failure cause is sent back to driver) {
+sc = new SparkContext(local[1,2], test)
+val data = sc.makeRDD(1 to 3, 3).map { x =
+  FailureSuiteState.synchronized {
+FailureSuiteState.tasksRun += 1
+if (x == 3) {
+  FailureSuiteState.tasksFailed += 1
+  throw new UserException(oops,
+new IllegalArgumentException(failed= + 
FailureSuiteState.tasksFailed))
+}
+  }
+  x * x
+}
+val thrown = intercept[SparkException] {
+  data.collect()
+}
+FailureSuiteState.synchronized {
+  assert(FailureSuiteState.tasksRun === 4)
+}
+assert(thrown.getClass === classOf[SparkException])
+assert(thrown.getCause.getClass === classOf[UserException])
+assert(thrown.getCause.getMessage === oops)
+assert(thrown.getCause.getCause.getClass === 
classOf[IllegalArgumentException])
+assert(thrown.getCause.getCause.getMessage === failed=2)
+FailureSuiteState.clear()
+  }
+
+  test(failure cause stacktrace is sent back to driver if exception is 
not serializable) {
+sc = new SparkContext(local, test)
+val data = sc.makeRDD(1 to 3).map(x = { throw new 
NonSerializableUserException; (x,
+  x) }).groupByKey(3)
+val thrown = intercept[SparkException] {
+  data.collect()
+}
+assert(thrown.getClass === classOf[SparkException])
+assert(thrown.getCause === null)
+assert(thrown.getMessage.contains(NonSerializableUserException))
+FailureSuiteState.clear()
+  }
+
+  test(failure cause stacktrace is sent back to driver if exception is 
not deserializable) {
+sc = new SparkContext(local, test)
+val data = sc.makeRDD(1 to 3).map(x = { throw new 
NonDeserializableUserException; (x,
+  x) }).groupByKey(3)
+val thrown = intercept[SparkException] {
+  data.collect()
+}
+assert(thrown.getClass === classOf[SparkException])
+assert(thrown.getCause === null)
+assert(thrown.getMessage.contains(NonDeserializableUserException))
+FailureSuiteState.clear()
+  }
+
   // TODO: Need to add tests with shuffle fetch failures.
 }
+
+class UserException(message: String, cause: Throwable)
+  extends RuntimeException(message, cause)
+
+class NonSerializableUserException extends RuntimeException {
+  val nonSerializableInstanceVariable = new NonSerializable
+}
+
+class NonDeserializableUserException extends RuntimeException {
+  private def readObject(in: ObjectInputStream): Unit = {
+throw new IOException(Intentional exception during deserialization.);
--- End diff --

nit: semicolon


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8625] [Core] Propagate user exceptions ...

2015-07-22 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/7014#discussion_r35280981
  
--- Diff: core/src/test/scala/org/apache/spark/FailureSuite.scala ---
@@ -141,5 +141,73 @@ class FailureSuite extends SparkFunSuite with 
LocalSparkContext {
 FailureSuiteState.clear()
   }
 
+  // Run a 3-task map job in which task 1 always fails with a exception 
message that
+  // depends on the failure number, and check that we get the last failure.
+  test(last failure cause is sent back to driver) {
+sc = new SparkContext(local[1,2], test)
+val data = sc.makeRDD(1 to 3, 3).map { x =
+  FailureSuiteState.synchronized {
+FailureSuiteState.tasksRun += 1
+if (x == 3) {
+  FailureSuiteState.tasksFailed += 1
+  throw new UserException(oops,
+new IllegalArgumentException(failed= + 
FailureSuiteState.tasksFailed))
+}
+  }
+  x * x
+}
+val thrown = intercept[SparkException] {
+  data.collect()
+}
+FailureSuiteState.synchronized {
+  assert(FailureSuiteState.tasksRun === 4)
+}
+assert(thrown.getClass === classOf[SparkException])
+assert(thrown.getCause.getClass === classOf[UserException])
+assert(thrown.getCause.getMessage === oops)
+assert(thrown.getCause.getCause.getClass === 
classOf[IllegalArgumentException])
+assert(thrown.getCause.getCause.getMessage === failed=2)
+FailureSuiteState.clear()
+  }
+
+  test(failure cause stacktrace is sent back to driver if exception is 
not serializable) {
+sc = new SparkContext(local, test)
+val data = sc.makeRDD(1 to 3).map(x = { throw new 
NonSerializableUserException; (x,
+  x) }).groupByKey(3)
+val thrown = intercept[SparkException] {
+  data.collect()
+}
+assert(thrown.getClass === classOf[SparkException])
+assert(thrown.getCause === null)
+assert(thrown.getMessage.contains(NonSerializableUserException))
+FailureSuiteState.clear()
+  }
+
+  test(failure cause stacktrace is sent back to driver if exception is 
not deserializable) {
+sc = new SparkContext(local, test)
+val data = sc.makeRDD(1 to 3).map(x = { throw new 
NonDeserializableUserException; (x,
--- End diff --

haha, whoops; that said, the groupByKey and collect should be unnecessary 
as the foreach should throw the exception directly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8625] [Core] Propagate user exceptions ...

2015-07-17 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/7014#discussion_r34903771
  
--- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala ---
@@ -127,6 +146,19 @@ case class ExceptionFailure(
   }
 }
 
+class ThrowableSerializationWrapper(var exception: Throwable) extends 
Serializable {
--- End diff --

Perhaps add a brief comment on the intention of this class (to avoid errors 
due to unserializable exceptions), and maybe indicate that exception will be 
null in this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8625] [Core] Propagate user exceptions ...

2015-07-17 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/7014#discussion_r34903699
  
--- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala ---
@@ -97,13 +103,26 @@ case class ExceptionFailure(
 description: String,
 stackTrace: Array[StackTraceElement],
 fullStackTrace: String,
-metrics: Option[TaskMetrics])
+metrics: Option[TaskMetrics],
+exceptionWrapper: Option[ThrowableSerializationWrapper])
   extends TaskFailedReason {
 
+  /**
+   * `preserveCause` is used to keep the exception itself so it is 
available to the
+   * driver. This may be set to `false` in the event that the exception is 
not in fact
+   * serializable.
+   */
+  private[spark] def this(e: Throwable, metrics: Option[TaskMetrics], 
preserveCause: Boolean) {
+this(e.getClass.getName, e.getMessage, e.getStackTrace, 
Utils.exceptionString(e), metrics,
+  if (preserveCause) Some(new ThrowableSerializationWrapper(e)) else 
None)
+  }
+
   private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) {
-this(e.getClass.getName, e.getMessage, e.getStackTrace, 
Utils.exceptionString(e), metrics)
+this(e, metrics, true)
   }
 
+  def exception = exceptionWrapper.map(_.exception)
--- End diff --

I think this would actually be `Some(null)` in the exceptional case, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8625] [Core] Propagate user exceptions ...

2015-07-17 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/7014#discussion_r34903555
  
--- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala ---
@@ -127,6 +146,19 @@ case class ExceptionFailure(
   }
 }
 
+class ThrowableSerializationWrapper(var exception: Throwable) extends 
Serializable {
+  private def writeObject(out: ObjectOutputStream): Unit = {
+out.writeObject(exception)
+  }
+  private def readObject(in: ObjectInputStream): Unit = {
+try {
+  exception = in.readObject().asInstanceOf[Throwable]
+} catch {
+  case _ : ClassNotFoundException | _ : IOException = // exception is 
null
--- End diff --

We could log the error at warning level so users know why they're not 
seeing the actual exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8625] [Core] Propagate user exceptions ...

2015-07-17 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/7014#discussion_r34903975
  
--- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala ---
@@ -97,13 +103,26 @@ case class ExceptionFailure(
 description: String,
 stackTrace: Array[StackTraceElement],
 fullStackTrace: String,
-metrics: Option[TaskMetrics])
+metrics: Option[TaskMetrics],
+exceptionWrapper: Option[ThrowableSerializationWrapper])
--- End diff --

Would you mind adding a test for this guy? Could make a new Exception 
subtype which throws an exception in readObject() :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8625] [Core] Propagate user exceptions ...

2015-07-17 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/7014#discussion_r34903614
  
--- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala ---
@@ -97,13 +103,26 @@ case class ExceptionFailure(
 description: String,
 stackTrace: Array[StackTraceElement],
 fullStackTrace: String,
-metrics: Option[TaskMetrics])
+metrics: Option[TaskMetrics],
+exceptionWrapper: Option[ThrowableSerializationWrapper])
   extends TaskFailedReason {
 
+  /**
+   * `preserveCause` is used to keep the exception itself so it is 
available to the
+   * driver. This may be set to `false` in the event that the exception is 
not in fact
+   * serializable.
+   */
+  private[spark] def this(e: Throwable, metrics: Option[TaskMetrics], 
preserveCause: Boolean) {
+this(e.getClass.getName, e.getMessage, e.getStackTrace, 
Utils.exceptionString(e), metrics,
+  if (preserveCause) Some(new ThrowableSerializationWrapper(e)) else 
None)
+  }
+
   private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) {
-this(e.getClass.getName, e.getMessage, e.getStackTrace, 
Utils.exceptionString(e), metrics)
+this(e, metrics, true)
--- End diff --

perhaps used named parameter, `preservesCause = true`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8644] Include call site in SparkExcepti...

2015-07-16 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/7028#issuecomment-122142207
  
Thanks! Merged into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8625] [Core] Propagate user exceptions ...

2015-07-16 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/7014#discussion_r34855831
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -300,8 +300,16 @@ private[spark] class Executor(
   m
 }
   }
-  val taskEndReason = new ExceptionFailure(t, metrics)
-  execBackend.statusUpdate(taskId, TaskState.FAILED, 
ser.serialize(taskEndReason))
+  val serializedTaskEndReason = {
+try {
+  ser.serialize(new ExceptionFailure(t, metrics))
--- End diff --

(By the way, the particular exception I'm thinking of is Scala's 
MatchError, which actually includes the object which failed to match.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8625] [Core] Propagate user exceptions ...

2015-07-16 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/7014#discussion_r34855806
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -300,8 +300,16 @@ private[spark] class Executor(
   m
 }
   }
-  val taskEndReason = new ExceptionFailure(t, metrics)
-  execBackend.statusUpdate(taskId, TaskState.FAILED, 
ser.serialize(taskEndReason))
+  val serializedTaskEndReason = {
+try {
+  ser.serialize(new ExceptionFailure(t, metrics))
--- End diff --

Is it possible that an exception does not throw a NotSerializableException 
but cannot be deserialized on the other side? This could be due to, say, class 
loader issues or other issues not caught during serialization time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8644] Include call site in SparkExcepti...

2015-07-12 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/7028#issuecomment-120777385
  
@squito, I have updated the PR and simplified it greatly, thus removing 
most of the magic. Now we simply append the current stack trace inside 
DAGScheduler, with no special marker. Only someone looking very closely at the 
trace would observe something amiss, and they would see the comment pointing to 
the associated JIRA.

I think we should go with this approach over the wrapping approach for the 
simple reason that it is less likely to break callers. Some callers may fail to 
print the cause of the exception, which would have been fine before with no 
cause but now would not work. Some callers may inspect the message directly, 
which would be different (or we would have to duplicate). Some callers may 
display the exception to users, which would look significantly different if 
wrapped (rather than just made longer).

Please let me know if you have further concerns; I have run into this 
problem sufficiently often now to want to push for this to go in (and the 
workaround is really annoying -- adding prints or try-catches to every line 
which may have caused the issue!).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8644] Include call site in SparkExcepti...

2015-07-12 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/7028#issuecomment-120778945
  
fetch failure - jenkins test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8644] Include call site in SparkExcepti...

2015-07-03 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/7028#issuecomment-118447192
  
The problem with any approach that wraps the exception is that we no longer 
throw an exception of the original type; we instead always throw 
SparkExceptions (as in your PR). This could be considered an API-breaking 
change, and one that would only break at runtime.

The benefit of appending stack trace elements is that one can apply it to 
any exception without impact on the callers or DAGScheduler.

I think that it is very intuitive to join the stacks before and after an 
event loop, and it has the expected semantics of code leaving the area I know 
and entering into scary Spark internals. However, I agree that the fact that 
we're joining a user-readable stack instead of the actual stack may be 
confusing.

A compromise would be to make the stack look like this:

```
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 
(TID 0, localhost): java.lang.RuntimeException: uh-oh!
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38$$anonfun$apply$mcJ$sp$2.apply(DAGSchedulerSuite.scala:883)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38$$anonfun$apply$mcJ$sp$2.apply(DAGSchedulerSuite.scala:883)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1627)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1774)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1774)
at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1294)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1285)
at = DAGScheduler EventLoop Submission =.()
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:558)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1741)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1759)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1774)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1788)
at org.apache.spark.rdd.RDD.count(RDD.scala:1095)
...
```

The difference from the current version being that this also includes the 
Spark internals leading up to the actual EventLoop itself, which makes the 
stack appear more natural (but also uglier due to the several indirections 
through runJob). The other difference is that it would return a more intuitive 
(but less useful) stack trace in situations where callSite is currently used in 
Spark (such as when starting a new job in Spark Streaming).

I believe that this change would satisfy your biggest concern (making the 
stack trace magical) because, as I said, I think this stack trace is actually 
very readable, and I'm willing to trade off some usefulness for less magic 
because it still gets us much farther than today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33596669
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -466,23 +531,13 @@ private[worker] class Worker(
 case _ =
   logDebug(sDriver $driverId changed state to $state)
   }
-  master ! DriverStateChanged(driverId, state, exception)
+  sendToMaster(driverStageChanged)
--- End diff --

Rather, the change about the `driverStageChanged @` instead of recreating 
`DriverStateChanged(driverId, state, exception)`. It's not particularly 
important, but I was just curious if there was a reason.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117354387
  
Just one comment thanks to your proactive TODOifying :) LGTM, feel free to 
merge after.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33627528
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
---
@@ -504,6 +518,7 @@ private[master] class Master(
   }
 
   private def completeRecovery() {
+// TODO Why synchronized
--- End diff --

This was due to an earlier state in the code when this method could be 
invoked from a different thread. It can be safely removed now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8644] Include call site in SparkExcepti...

2015-06-30 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/7028#issuecomment-117353792
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33515404
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---
@@ -40,98 +37,139 @@ import org.apache.spark.util.{ActorLogReceive, 
RpcUtils, Utils, AkkaUtils}
  * @param masterUrls Each url should look like spark://host:port.
  */
 private[spark] class AppClient(
-actorSystem: ActorSystem,
+rpcEnv: RpcEnv,
 masterUrls: Array[String],
 appDescription: ApplicationDescription,
 listener: AppClientListener,
 conf: SparkConf)
   extends Logging {
 
-  private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, 
AkkaUtils.protocol(actorSystem)))
+  private val masterRpcAddresses = 
masterUrls.map(RpcAddress.fromSparkURL(_))
 
-  private val REGISTRATION_TIMEOUT = 20.seconds
+  private val REGISTRATION_TIMEOUT_SECONDS = 20
   private val REGISTRATION_RETRIES = 3
 
-  private var masterAddress: Address = null
-  private var actor: ActorRef = null
+  private var endpoint: RpcEndpointRef = null
   private var appId: String = null
-  private var registered = false
-  private var activeMasterUrl: String = null
-
-  private class ClientActor extends Actor with ActorLogReceive with 
Logging {
-var master: ActorSelection = null
-var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
-var alreadyDead = false  // To avoid calling listener.dead() multiple 
times
-var registrationRetryTimer: Option[Cancellable] = None
-
-override def preStart() {
-  context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+  @volatile private var registered = false
+
+  private class ClientEndpoint(override val rpcEnv: RpcEnv) extends 
ThreadSafeRpcEndpoint
+with Logging {
+
+private var master: Option[RpcEndpointRef] = None
+// To avoid calling listener.disconnected() multiple times
+private var alreadyDisconnected = false
+@volatile private var alreadyDead = false // To avoid calling 
listener.dead() multiple times
+@volatile private var registerMasterFutures: Array[JFuture[_]] = null
+@volatile private var registrationRetryTimer: JScheduledFuture[_] = 
null
+
+// A thread pool for registering with masters. Because registering 
with a master is a blocking
+// action, this thread pool must be able to create 
masterRpcAddresses.size threads at the same
+// time so that we can register with all masters.
+private val registerMasterThreadPool = new ThreadPoolExecutor(
+  0,
+  masterRpcAddresses.size, // Make sure we can register with all 
masters at the same time
+  60L, TimeUnit.SECONDS,
+  new SynchronousQueue[Runnable](),
+  
ThreadUtils.namedThreadFactory(appclient-register-master-threadpool))
+
+// A scheduled executor for scheduling the registration actions
+private val registrationRetryThread =
+  
ThreadUtils.newDaemonSingleThreadScheduledExecutor(appclient-registration-retry-thread)
+
+override def onStart(): Unit = {
   try {
-registerWithMaster()
+registerWithMaster(1)
   } catch {
 case e: Exception =
   logWarning(Failed to connect to master, e)
   markDisconnected()
-  context.stop(self)
+  stop()
   }
 }
 
-def tryRegisterAllMasters() {
-  for (masterAkkaUrl - masterAkkaUrls) {
-logInfo(Connecting to master  + masterAkkaUrl + ...)
-val actor = context.actorSelection(masterAkkaUrl)
-actor ! RegisterApplication(appDescription)
+/**
+ *  Register with all masters asynchronously and returns an array 
`Future`s for cancellation.
+ */
+private def tryRegisterAllMasters(): Array[JFuture[_]] = {
+  for (masterAddress - masterRpcAddresses) yield {
+registerMasterThreadPool.submit(new Runnable {
+  override def run(): Unit = try {
+if (registered) {
+  return
+}
+logInfo(Connecting to master  + masterAddress.toSparkURL + 
...)
+val masterRef =
+  rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, 
Master.ENDPOINT_NAME)
+masterRef.send(RegisterApplication(appDescription, self))
+  } catch {
+case ie: InterruptedException = // Cancelled
+case NonFatal(e) = logError(e.getMessage, e)
+  }
+})
   }
 }
 
-def registerWithMaster() {
-  tryRegisterAllMasters()
-  import context.dispatcher
-  var

[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33518468
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -235,21 +250,47 @@ private[worker] class Worker(
  * still not safe if the old master recovers within this interval, 
but this is a much
  * less likely scenario.
  */
-if (master != null) {
-  master ! RegisterWorker(
-workerId, host, port, cores, memory, webUi.boundPort, 
publicAddress)
-} else {
-  // We are retrying the initial registration
-  tryRegisterAllMasters()
+master match {
+  case Some(masterRef) =
+// registered == false  master != None means we lost the 
connection to master, so
+// masterRef cannot be used and we need to recreate it again. 
Note: we must not set
+// master to None due to the above comments.
+if (registerMasterFutures != null) {
+  registerMasterFutures.foreach(_.cancel(true))
+}
+val masterAddress = masterRef.address
+registerMasterFutures = 
Array(registerMasterThreadPool.submit(new Runnable {
+  override def run(): Unit =
--- End diff --

nit: braces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33519252
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -302,27 +367,27 @@ private[worker] class Worker(
   // the directory is used by an application - check that the 
application is not running
   // when cleaning up
   val appIdFromDir = dir.getName
-  val isAppStillRunning = 
executors.values.map(_.appId).contains(appIdFromDir)
+  val isAppStillRunning = appIds.contains(appIdFromDir)
   dir.isDirectory  !isAppStillRunning 
-  !Utils.doesDirectoryContainAnyNewFiles(dir, 
APP_DATA_RETENTION_SECS)
+  !Utils.doesDirectoryContainAnyNewFiles(dir, 
APP_DATA_RETENTION_SECONDS)
 }.foreach { dir =
   logInfo(sRemoving directory: ${dir.getPath})
   Utils.deleteRecursively(dir)
 }
-  }
+  } (cleanupThreadExecutor)
 
-  cleanupFuture onFailure {
+  cleanupFuture.onFailure {
 case e: Throwable =
   logError(App dir cleanup failed:  + e.getMessage, e)
-  }
+  } (cleanupThreadExecutor)
--- End diff --

nit: `}(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33519250
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -302,27 +367,27 @@ private[worker] class Worker(
   // the directory is used by an application - check that the 
application is not running
   // when cleaning up
   val appIdFromDir = dir.getName
-  val isAppStillRunning = 
executors.values.map(_.appId).contains(appIdFromDir)
+  val isAppStillRunning = appIds.contains(appIdFromDir)
   dir.isDirectory  !isAppStillRunning 
-  !Utils.doesDirectoryContainAnyNewFiles(dir, 
APP_DATA_RETENTION_SECS)
+  !Utils.doesDirectoryContainAnyNewFiles(dir, 
APP_DATA_RETENTION_SECONDS)
 }.foreach { dir =
   logInfo(sRemoving directory: ${dir.getPath})
   Utils.deleteRecursively(dir)
 }
-  }
+  } (cleanupThreadExecutor)
--- End diff --

nit: `}(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33519340
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -466,23 +531,13 @@ private[worker] class Worker(
 case _ =
   logDebug(sDriver $driverId changed state to $state)
   }
-  master ! DriverStateChanged(driverId, state, exception)
+  sendToMaster(driverStageChanged)
--- End diff --

This change was not strictly necessary, right? Maybe it used to have a self 
reference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33513145
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/Client.scala ---
@@ -36,20 +36,30 @@ import org.apache.spark.util.{ActorLogReceive, 
AkkaUtils, RpcUtils, Utils}
  * We currently don't support retry if submission fails. In HA mode, 
client will submit request to
  * all masters and see which one could handle it.
  */
-private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
-  extends Actor with ActorLogReceive with Logging {
-
-  private val masterActors = driverArgs.masters.map { m =
-context.actorSelection(Master.toAkkaUrl(m, 
AkkaUtils.protocol(context.system)))
-  }
-  private val lostMasters = new HashSet[Address]
-  private var activeMasterActor: ActorSelection = null
-
-  val timeout = RpcUtils.askTimeout(conf)
-
-  override def preStart(): Unit = {
-context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
-
+private class ClientEndpoint(
+override val rpcEnv: RpcEnv,
+driverArgs: ClientArguments,
+masterEndpoints: Seq[RpcEndpointRef],
+conf: SparkConf)
+  extends ThreadSafeRpcEndpoint with Logging {
+
+  // A scheduled executor used to send messages at the specified time.
+  private val forwardMessageThread =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor(client-forward-message)
+  // Used to provide the implicit parameter of `Future` methods.
+  private val forwardMessageExecutionContext =
+ExecutionContext.fromExecutor(forwardMessageThread,
+  t = t match {
+case ie: InterruptedException = // Exit normally
+case e =
--- End diff --

nit: `e: Throwable` explicitly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33513862
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/Client.scala ---
@@ -82,29 +92,38 @@ private class ClientActor(driverArgs: ClientArguments, 
conf: SparkConf)
   driverArgs.cores,
   driverArgs.supervise,
   command)
-
-// This assumes only one Master is active at a time
-for (masterActor - masterActors) {
-  masterActor ! RequestSubmitDriver(driverDescription)
-}
+ayncSendToMasterAndForwardReply[SubmitDriverResponse](
+  RequestSubmitDriver(driverDescription))
 
   case kill =
 val driverId = driverArgs.driverId
-// This assumes only one Master is active at a time
-for (masterActor - masterActors) {
-  masterActor ! RequestKillDriver(driverId)
-}
+
ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
+}
+  }
+
+  /**
+   * Send the message to master and forward the reply to self 
asynchronously.
+   */
+  private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): 
Unit = {
+for (masterEndpoint - masterEndpoints) {
+  masterEndpoint.ask[T](message).onComplete {
+case Success(v) = self.send(v)
+case Failure(e) =
+  println(sError sending messages to master $masterEndpoint, 
exiting.)
--- End diff --

exiting? I wouldn't expect this to be a failure condition given some 
masters may be dead, but maybe the text is just out of date.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33513905
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/Client.scala ---
@@ -82,29 +92,38 @@ private class ClientActor(driverArgs: ClientArguments, 
conf: SparkConf)
   driverArgs.cores,
   driverArgs.supervise,
   command)
-
-// This assumes only one Master is active at a time
-for (masterActor - masterActors) {
-  masterActor ! RequestSubmitDriver(driverDescription)
-}
+ayncSendToMasterAndForwardReply[SubmitDriverResponse](
+  RequestSubmitDriver(driverDescription))
 
   case kill =
 val driverId = driverArgs.driverId
-// This assumes only one Master is active at a time
-for (masterActor - masterActors) {
-  masterActor ! RequestKillDriver(driverId)
-}
+
ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
+}
+  }
+
+  /**
+   * Send the message to master and forward the reply to self 
asynchronously.
+   */
+  private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): 
Unit = {
+for (masterEndpoint - masterEndpoints) {
+  masterEndpoint.ask[T](message).onComplete {
+case Success(v) = self.send(v)
+case Failure(e) =
+  println(sError sending messages to master $masterEndpoint, 
exiting.)
--- End diff --

also let's make this the error message (should we be using println?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33515502
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---
@@ -40,98 +37,139 @@ import org.apache.spark.util.{ActorLogReceive, 
RpcUtils, Utils, AkkaUtils}
  * @param masterUrls Each url should look like spark://host:port.
  */
 private[spark] class AppClient(
-actorSystem: ActorSystem,
+rpcEnv: RpcEnv,
 masterUrls: Array[String],
 appDescription: ApplicationDescription,
 listener: AppClientListener,
 conf: SparkConf)
   extends Logging {
 
-  private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, 
AkkaUtils.protocol(actorSystem)))
+  private val masterRpcAddresses = 
masterUrls.map(RpcAddress.fromSparkURL(_))
 
-  private val REGISTRATION_TIMEOUT = 20.seconds
+  private val REGISTRATION_TIMEOUT_SECONDS = 20
   private val REGISTRATION_RETRIES = 3
 
-  private var masterAddress: Address = null
-  private var actor: ActorRef = null
+  private var endpoint: RpcEndpointRef = null
   private var appId: String = null
-  private var registered = false
-  private var activeMasterUrl: String = null
-
-  private class ClientActor extends Actor with ActorLogReceive with 
Logging {
-var master: ActorSelection = null
-var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
-var alreadyDead = false  // To avoid calling listener.dead() multiple 
times
-var registrationRetryTimer: Option[Cancellable] = None
-
-override def preStart() {
-  context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+  @volatile private var registered = false
+
+  private class ClientEndpoint(override val rpcEnv: RpcEnv) extends 
ThreadSafeRpcEndpoint
+with Logging {
+
+private var master: Option[RpcEndpointRef] = None
+// To avoid calling listener.disconnected() multiple times
+private var alreadyDisconnected = false
+@volatile private var alreadyDead = false // To avoid calling 
listener.dead() multiple times
+@volatile private var registerMasterFutures: Array[JFuture[_]] = null
+@volatile private var registrationRetryTimer: JScheduledFuture[_] = 
null
+
+// A thread pool for registering with masters. Because registering 
with a master is a blocking
+// action, this thread pool must be able to create 
masterRpcAddresses.size threads at the same
+// time so that we can register with all masters.
+private val registerMasterThreadPool = new ThreadPoolExecutor(
+  0,
+  masterRpcAddresses.size, // Make sure we can register with all 
masters at the same time
+  60L, TimeUnit.SECONDS,
+  new SynchronousQueue[Runnable](),
+  
ThreadUtils.namedThreadFactory(appclient-register-master-threadpool))
+
+// A scheduled executor for scheduling the registration actions
+private val registrationRetryThread =
+  
ThreadUtils.newDaemonSingleThreadScheduledExecutor(appclient-registration-retry-thread)
+
+override def onStart(): Unit = {
   try {
-registerWithMaster()
+registerWithMaster(1)
   } catch {
 case e: Exception =
   logWarning(Failed to connect to master, e)
   markDisconnected()
-  context.stop(self)
+  stop()
   }
 }
 
-def tryRegisterAllMasters() {
-  for (masterAkkaUrl - masterAkkaUrls) {
-logInfo(Connecting to master  + masterAkkaUrl + ...)
-val actor = context.actorSelection(masterAkkaUrl)
-actor ! RegisterApplication(appDescription)
+/**
+ *  Register with all masters asynchronously and returns an array 
`Future`s for cancellation.
+ */
+private def tryRegisterAllMasters(): Array[JFuture[_]] = {
+  for (masterAddress - masterRpcAddresses) yield {
+registerMasterThreadPool.submit(new Runnable {
+  override def run(): Unit = try {
+if (registered) {
+  return
+}
+logInfo(Connecting to master  + masterAddress.toSparkURL + 
...)
+val masterRef =
+  rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, 
Master.ENDPOINT_NAME)
+masterRef.send(RegisterApplication(appDescription, self))
+  } catch {
+case ie: InterruptedException = // Cancelled
+case NonFatal(e) = logError(e.getMessage, e)
+  }
+})
   }
 }
 
-def registerWithMaster() {
-  tryRegisterAllMasters()
-  import context.dispatcher
-  var

[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33514893
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---
@@ -40,98 +37,139 @@ import org.apache.spark.util.{ActorLogReceive, 
RpcUtils, Utils, AkkaUtils}
  * @param masterUrls Each url should look like spark://host:port.
  */
 private[spark] class AppClient(
-actorSystem: ActorSystem,
+rpcEnv: RpcEnv,
 masterUrls: Array[String],
 appDescription: ApplicationDescription,
 listener: AppClientListener,
 conf: SparkConf)
   extends Logging {
 
-  private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, 
AkkaUtils.protocol(actorSystem)))
+  private val masterRpcAddresses = 
masterUrls.map(RpcAddress.fromSparkURL(_))
 
-  private val REGISTRATION_TIMEOUT = 20.seconds
+  private val REGISTRATION_TIMEOUT_SECONDS = 20
   private val REGISTRATION_RETRIES = 3
 
-  private var masterAddress: Address = null
-  private var actor: ActorRef = null
+  private var endpoint: RpcEndpointRef = null
   private var appId: String = null
-  private var registered = false
-  private var activeMasterUrl: String = null
-
-  private class ClientActor extends Actor with ActorLogReceive with 
Logging {
-var master: ActorSelection = null
-var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
-var alreadyDead = false  // To avoid calling listener.dead() multiple 
times
-var registrationRetryTimer: Option[Cancellable] = None
-
-override def preStart() {
-  context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+  @volatile private var registered = false
+
+  private class ClientEndpoint(override val rpcEnv: RpcEnv) extends 
ThreadSafeRpcEndpoint
+with Logging {
+
+private var master: Option[RpcEndpointRef] = None
+// To avoid calling listener.disconnected() multiple times
+private var alreadyDisconnected = false
+@volatile private var alreadyDead = false // To avoid calling 
listener.dead() multiple times
--- End diff --

It's not generally true that ThreadSafeRpcEndpoints require their mutable 
state to be volatile, right? Perhaps this is just being modified from a 
separate thread pool?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33519086
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -258,41 +299,65 @@ private[worker] class Worker(
 }
   }
 
+  /**
+   * Cancel last registeration retry, or do nothing if no retry
+   */
+  private def cancelLastRegistrationRetry(): Unit = {
+if (registerMasterFutures != null) {
+  registerMasterFutures.foreach(_.cancel(true))
+  registerMasterFutures = null
+}
+registrationRetryTimer.foreach(_.cancel(true))
+registrationRetryTimer = None
+  }
+
   private def registerWithMaster() {
-// DisassociatedEvent may be triggered multiple times, so don't 
attempt registration
+// onDisconnected may be triggered multiple times, so don't attempt 
registration
 // if there are outstanding registration attempts scheduled.
 registrationRetryTimer match {
   case None =
 registered = false
-tryRegisterAllMasters()
+registerMasterFutures = tryRegisterAllMasters()
 connectionAttemptCount = 0
-registrationRetryTimer = Some {
-  
context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
-INITIAL_REGISTRATION_RETRY_INTERVAL, self, 
ReregisterWithMaster)
-}
+registrationRetryTimer = 
Some(forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
--- End diff --

The indentation of this block is a little sketchy, maybe we can put the 
`new Runnable` on the next line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33514995
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---
@@ -40,98 +37,139 @@ import org.apache.spark.util.{ActorLogReceive, 
RpcUtils, Utils, AkkaUtils}
  * @param masterUrls Each url should look like spark://host:port.
  */
 private[spark] class AppClient(
-actorSystem: ActorSystem,
+rpcEnv: RpcEnv,
 masterUrls: Array[String],
 appDescription: ApplicationDescription,
 listener: AppClientListener,
 conf: SparkConf)
   extends Logging {
 
-  private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, 
AkkaUtils.protocol(actorSystem)))
+  private val masterRpcAddresses = 
masterUrls.map(RpcAddress.fromSparkURL(_))
 
-  private val REGISTRATION_TIMEOUT = 20.seconds
+  private val REGISTRATION_TIMEOUT_SECONDS = 20
   private val REGISTRATION_RETRIES = 3
 
-  private var masterAddress: Address = null
-  private var actor: ActorRef = null
+  private var endpoint: RpcEndpointRef = null
   private var appId: String = null
-  private var registered = false
-  private var activeMasterUrl: String = null
-
-  private class ClientActor extends Actor with ActorLogReceive with 
Logging {
-var master: ActorSelection = null
-var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
-var alreadyDead = false  // To avoid calling listener.dead() multiple 
times
-var registrationRetryTimer: Option[Cancellable] = None
-
-override def preStart() {
-  context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+  @volatile private var registered = false
+
+  private class ClientEndpoint(override val rpcEnv: RpcEnv) extends 
ThreadSafeRpcEndpoint
+with Logging {
+
+private var master: Option[RpcEndpointRef] = None
+// To avoid calling listener.disconnected() multiple times
+private var alreadyDisconnected = false
+@volatile private var alreadyDead = false // To avoid calling 
listener.dead() multiple times
+@volatile private var registerMasterFutures: Array[JFuture[_]] = null
+@volatile private var registrationRetryTimer: JScheduledFuture[_] = 
null
+
+// A thread pool for registering with masters. Because registering 
with a master is a blocking
+// action, this thread pool must be able to create 
masterRpcAddresses.size threads at the same
+// time so that we can register with all masters.
+private val registerMasterThreadPool = new ThreadPoolExecutor(
+  0,
+  masterRpcAddresses.size, // Make sure we can register with all 
masters at the same time
+  60L, TimeUnit.SECONDS,
+  new SynchronousQueue[Runnable](),
+  
ThreadUtils.namedThreadFactory(appclient-register-master-threadpool))
+
+// A scheduled executor for scheduling the registration actions
+private val registrationRetryThread =
+  
ThreadUtils.newDaemonSingleThreadScheduledExecutor(appclient-registration-retry-thread)
+
+override def onStart(): Unit = {
   try {
-registerWithMaster()
+registerWithMaster(1)
   } catch {
 case e: Exception =
   logWarning(Failed to connect to master, e)
   markDisconnected()
-  context.stop(self)
+  stop()
   }
 }
 
-def tryRegisterAllMasters() {
-  for (masterAkkaUrl - masterAkkaUrls) {
-logInfo(Connecting to master  + masterAkkaUrl + ...)
-val actor = context.actorSelection(masterAkkaUrl)
-actor ! RegisterApplication(appDescription)
+/**
+ *  Register with all masters asynchronously and returns an array 
`Future`s for cancellation.
+ */
+private def tryRegisterAllMasters(): Array[JFuture[_]] = {
+  for (masterAddress - masterRpcAddresses) yield {
+registerMasterThreadPool.submit(new Runnable {
+  override def run(): Unit = try {
+if (registered) {
+  return
+}
+logInfo(Connecting to master  + masterAddress.toSparkURL + 
...)
+val masterRef =
+  rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, 
Master.ENDPOINT_NAME)
+masterRef.send(RegisterApplication(appDescription, self))
+  } catch {
+case ie: InterruptedException = // Cancelled
+case NonFatal(e) = logError(e.getMessage, e)
--- End diff --

message should probably be like Failed to connect to $masterAddress


---
If your project is set up for it, you can reply to this email and have your
reply

[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33518043
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -181,24 +190,31 @@ private[worker] class Worker(
 metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
   }
 
-  private def changeMaster(url: String, uiUrl: String) {
+  private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
 // activeMasterUrl it's a valid Spark url since we receive it from 
master.
-activeMasterUrl = url
+activeMasterUrl = masterRef.address.toSparkURL
 activeMasterWebUiUrl = uiUrl
-master = context.actorSelection(
-  Master.toAkkaUrl(activeMasterUrl, 
AkkaUtils.protocol(context.system)))
-masterAddress = Master.toAkkaAddress(activeMasterUrl, 
AkkaUtils.protocol(context.system))
+master = Some(masterRef)
 connected = true
 // Cancel any outstanding re-registration attempts because we found a 
new master
-registrationRetryTimer.foreach(_.cancel())
-registrationRetryTimer = None
+cancelLastRegistrationRetry()
   }
 
-  private def tryRegisterAllMasters() {
-for (masterAkkaUrl - masterAkkaUrls) {
-  logInfo(Connecting to master  + masterAkkaUrl + ...)
-  val actor = context.actorSelection(masterAkkaUrl)
-  actor ! RegisterWorker(workerId, host, port, cores, memory, 
webUi.boundPort, publicAddress)
+  private def tryRegisterAllMasters(): Array[JFuture[_]] = {
+for (masterAddress - masterRpcAddresses) yield {
+  registerMasterThreadPool.submit(new Runnable {
+override def run(): Unit =
+  try {
+logInfo(Connecting to master  + masterAddress + ...)
+val masterEndpoint =
+  rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, 
Master.ENDPOINT_NAME)
+masterEndpoint.send(RegisterWorker(
+  workerId, host, port, self, cores, memory, webUi.boundPort, 
publicAddress))
+  } catch {
+case ie: InterruptedException = // Cancelled
+case NonFatal(e) = logError(e.getMessage, e)
--- End diff --

message should probably be like Failed to connect to master $masterAddress



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33518507
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -235,21 +250,47 @@ private[worker] class Worker(
  * still not safe if the old master recovers within this interval, 
but this is a much
  * less likely scenario.
  */
-if (master != null) {
-  master ! RegisterWorker(
-workerId, host, port, cores, memory, webUi.boundPort, 
publicAddress)
-} else {
-  // We are retrying the initial registration
-  tryRegisterAllMasters()
+master match {
+  case Some(masterRef) =
+// registered == false  master != None means we lost the 
connection to master, so
+// masterRef cannot be used and we need to recreate it again. 
Note: we must not set
+// master to None due to the above comments.
+if (registerMasterFutures != null) {
+  registerMasterFutures.foreach(_.cancel(true))
+}
+val masterAddress = masterRef.address
+registerMasterFutures = 
Array(registerMasterThreadPool.submit(new Runnable {
+  override def run(): Unit =
+try {
+  logInfo(Connecting to master  + masterAddress + ...)
+  val masterEndpoint =
+rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, 
masterAddress, Master.ENDPOINT_NAME)
+  masterEndpoint.send(RegisterWorker(
+workerId, host, port, self, cores, memory, 
webUi.boundPort, publicAddress))
+} catch {
+  case ie: InterruptedException = // Cancelled
+  case NonFatal(e) = logError(e.getMessage, e)
--- End diff --

message should probably be like Failed to connect to master $masterAddress
(can we merge this code path with the above?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33519394
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -510,13 +580,25 @@ private[worker] class Worker(
 }
   }
 
+  private def sendToMaster(message: Any): Unit = {
--- End diff --

also doc this guy about its behavior


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33517979
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -181,24 +190,31 @@ private[worker] class Worker(
 metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
   }
 
-  private def changeMaster(url: String, uiUrl: String) {
+  private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
 // activeMasterUrl it's a valid Spark url since we receive it from 
master.
-activeMasterUrl = url
+activeMasterUrl = masterRef.address.toSparkURL
 activeMasterWebUiUrl = uiUrl
-master = context.actorSelection(
-  Master.toAkkaUrl(activeMasterUrl, 
AkkaUtils.protocol(context.system)))
-masterAddress = Master.toAkkaAddress(activeMasterUrl, 
AkkaUtils.protocol(context.system))
+master = Some(masterRef)
 connected = true
 // Cancel any outstanding re-registration attempts because we found a 
new master
-registrationRetryTimer.foreach(_.cancel())
-registrationRetryTimer = None
+cancelLastRegistrationRetry()
   }
 
-  private def tryRegisterAllMasters() {
-for (masterAkkaUrl - masterAkkaUrls) {
-  logInfo(Connecting to master  + masterAkkaUrl + ...)
-  val actor = context.actorSelection(masterAkkaUrl)
-  actor ! RegisterWorker(workerId, host, port, cores, memory, 
webUi.boundPort, publicAddress)
+  private def tryRegisterAllMasters(): Array[JFuture[_]] = {
+for (masterAddress - masterRpcAddresses) yield {
--- End diff --

Maybe just `masterRpcAddresses.map { masterAddress =` instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33517925
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -181,24 +190,31 @@ private[worker] class Worker(
 metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
   }
 
-  private def changeMaster(url: String, uiUrl: String) {
+  private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
 // activeMasterUrl it's a valid Spark url since we receive it from 
master.
-activeMasterUrl = url
+activeMasterUrl = masterRef.address.toSparkURL
 activeMasterWebUiUrl = uiUrl
-master = context.actorSelection(
-  Master.toAkkaUrl(activeMasterUrl, 
AkkaUtils.protocol(context.system)))
-masterAddress = Master.toAkkaAddress(activeMasterUrl, 
AkkaUtils.protocol(context.system))
+master = Some(masterRef)
 connected = true
 // Cancel any outstanding re-registration attempts because we found a 
new master
-registrationRetryTimer.foreach(_.cancel())
-registrationRetryTimer = None
+cancelLastRegistrationRetry()
   }
 
-  private def tryRegisterAllMasters() {
-for (masterAkkaUrl - masterAkkaUrls) {
-  logInfo(Connecting to master  + masterAkkaUrl + ...)
-  val actor = context.actorSelection(masterAkkaUrl)
-  actor ! RegisterWorker(workerId, host, port, cores, memory, 
webUi.boundPort, publicAddress)
+  private def tryRegisterAllMasters(): Array[JFuture[_]] = {
+for (masterAddress - masterRpcAddresses) yield {
+  registerMasterThreadPool.submit(new Runnable {
+override def run(): Unit =
--- End diff --

nit: add braces for this method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-116860938
  
The core logic all looks good to me, just had some nits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8644] Include call site in SparkExcepti...

2015-06-25 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/7028#issuecomment-115526307
  
I've updated the code to move the injection to runJob. This means that 
exceptions thrown from any path inside the DAGScheduler that fail the job will 
now have their stack trace supplemented, if the job was submitted via runJob 
(i.e., not an approximate job or submitted via SparkContext.submitJob).

After looking over the usages of special call site, I found that streaming 
is a significant user and thus opted to keep the callsite parsing code. I could 
definitely see the benefit of removing the 100 lines of code or so that's 
required to do this for the sake of simplicity at the cost of streaming stack 
traces not being quite as helpful as they could be, so feel free to push back 
and I can change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8644] Include call site in SparkExcepti...

2015-06-25 Thread aarondav
GitHub user aarondav opened a pull request:

https://github.com/apache/spark/pull/7028

[SPARK-8644] Include call site in SparkException stack traces thrown by job 
failures

Example exception (new part at bottom, clearly demarcated):

```
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 
(TID 0, localhost): java.lang.RuntimeException: uh-oh!
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33$$anonfun$34$$anonfun$apply$mcJ$sp$1.apply(DAGSchedulerSuite.scala:851)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33$$anonfun$34$$anonfun$apply$mcJ$sp$1.apply(DAGSchedulerSuite.scala:851)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1637)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1285)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1276)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1275)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1275)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:749)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:749)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:749)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1486)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at = Job Submission =.(Native Method)
at org.apache.spark.rdd.RDD.count(RDD.scala:1095)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33$$anonfun$34.apply$mcJ$sp(DAGSchedulerSuite.scala:851)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33$$anonfun$34.apply(DAGSchedulerSuite.scala:851)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33$$anonfun$34.apply(DAGSchedulerSuite.scala:851)
at org.scalatest.Assertions$class.intercept(Assertions.scala:997)
at org.scalatest.FunSuite.intercept(FunSuite.scala:1555)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33.apply$mcV$sp(DAGSchedulerSuite.scala:850)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33.apply(DAGSchedulerSuite.scala:849)
at 
org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$33.apply(DAGSchedulerSuite.scala:849)
at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:42)
at 
org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
at 
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
at 
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
```

Major complexity was getting the actual stack trace which we already had in 
the right place back into StackTraceElement form.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com

[GitHub] spark pull request: [SPARK-8644] Include call site in SparkExcepti...

2015-06-25 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/7028#issuecomment-115422594
  
I think the most important thing is to include the user stack trace 
somewhere. Users don't really care what's going on as long as they can identify 
a line of code from their codebase in the stack. That said, I see your point 
about the current approach being overly magical.

Note that producing an Exception that matches your example would require 
wrapping/replacing whichever actual exception with a SparkException (otherwise 
we cannot rewrite the message). However, we do have two disjoint options:

1. Keep the munging logic in DAGScheduler.failJobAndIndependentStages and 
simply put the Job Submission stack trace above the driver stack trace, 
included in the Exception's message. This puts the exception a bit out of 
order but would avoid having to parse the longForm CallSite. We can do this 
because this method only throws SparkExceptions anyway.
2. Move the munging logic to DAGScheduler.runJob and keep it as a fake part 
of the stack trace. This would allow us to inject the user stack trace into 
*all* exceptions thrown as part of the DAGScheduler EventLoop rather than just 
ones that induce normal stage fails.

With number 2, we could additionally replace the use of callSite parsing 
with the current thread's stack trace, which would make it look a lot less 
magical (it's just bridging the two real stack traces across the event loop). 

I'd personally prefer 2 for its improved scope, but I don't know enough 
about how the call site may differ from the physical stack trace to say if we 
should also just use the local stack.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8644] Include call site in SparkExcepti...

2015-06-25 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/7028#issuecomment-115404640
  
cc @JoshRosen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8161] Set externalBlockStoreInitialized...

2015-06-17 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/6702#issuecomment-112904974
  
LGMT


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8306] AddJar command needs to set the n...

2015-06-11 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/6758#discussion_r32253172
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala ---
@@ -91,7 +91,7 @@ case class AddJar(path: String) extends RunnableCommand {
 val jarURL = new java.io.File(path).toURL
 val newClassLoader = new java.net.URLClassLoader(Array(jarURL), 
currentClassLoader)
 Thread.currentThread.setContextClassLoader(newClassLoader)
-
org.apache.hadoop.hive.ql.metadata.Hive.get().getConf().setClassLoader(newClassLoader)
+hiveContext.executionHive.state.getConf.setClassLoader(newClassLoader)
--- End diff --

This is very likely to work for almost all cases, but we should probably 
add a TODO or JIRA to clean up some of this stuff. For instance, here we may be 
setting the executionHive class loader to an entirely different class loader 
than it originally had (which is likely to work in typical situations, but is 
not easy to trace). 

Additionally, the flow of class loader-setting inside ClientWrapper is very 
non-obvious, especially as it differs between Hive 12 and Hive 13 -- we should 
explicitly document this if not make it cleaner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6955] Perform port retries at NettyBloc...

2015-05-08 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5575#issuecomment-100320805
  
cc @andrewor14 @pwendell  We should consider merging this into the 1.4 
branch in case there is another RC. It has been an outstanding issue for a 
while.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6627] Finished rename to ShuffleBlockRe...

2015-05-05 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5764#issuecomment-99260768
  
Yeah, it should be safe to rename that one. It's public due to being in 
Java, no cool `private[visibility]` modifiers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7183][Network] Fix memory leak of Trans...

2015-05-01 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5743#issuecomment-98166558
  
Just a couple remaining comments, otherwise this LGTM!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7183][Network] Fix memory leak of Trans...

2015-05-01 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5743#discussion_r29510283
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 ---
@@ -82,10 +75,8 @@ public void exceptionCaught(Throwable cause) {
 
   @Override
   public void channelUnregistered() {
-// Inform the StreamManager that these streams will no longer be read 
from.
-for (long streamId : streamIds) {
-  streamManager.connectionTerminated(streamId);
-}
+// Inform the StreamManager that this channel is unregistered.
--- End diff --

nit: I think we can remove this comment, the new code is straightforward 
enough.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7183][Network] Fix memory leak of Trans...

2015-05-01 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5743#discussion_r29510060
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
 ---
@@ -80,12 +95,17 @@ public ManagedBuffer getChunk(long streamId, int 
chunkIndex) {
   }
 
   @Override
-  public void connectionTerminated(long streamId) {
-// Release all remaining buffers.
-StreamState state = streams.remove(streamId);
-if (state != null  state.buffers != null) {
-  while (state.buffers.hasNext()) {
-state.buffers.next().release();
+  public void connectionTerminated(Channel channel) {
+// Close all streams which have been associated with the channel.
+for (Map.EntryLong, StreamState entry: streams.entrySet()) {
+  StreamState state = entry.getValue();
+  if (state.associatedChannel == channel) {
+streams.remove(entry.getKey());
--- End diff --

Ah, good point, this is safe because our Map is a ConcurrentHashMap (or 
else you would need to use an iterator to remove it safely). Would you mind 
making the left-hand type of the declaration of `streams` a 
`ConcurrentHashMap`? This is not the first place where we rely on the semantics 
of a ConcurrentHashMap over a general Map, and we should use proper style 
therefore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7183][Network] Fix memory leak of Trans...

2015-05-01 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5743#discussion_r29510128
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
 ---
@@ -42,11 +46,15 @@
   private static class StreamState {
 final IteratorManagedBuffer buffers;
 
+// The channel associated to the stream
+Channel associatedChannel = null;
+
 // Used to keep track of the index of the buffer that the user has 
retrieved, just to ensure
 // that the caller only requests each chunk one at a time, in order.
 int curChunk = 0;
 
 StreamState(IteratorManagedBuffer buffers) {
+  Preconditions.checkNotNull(buffers);
   this.buffers = buffers;
--- End diff --

nit: `this.buffers = Preconditions.checkNotNull(buffers)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7183][Network] Fix memory leak of Trans...

2015-05-01 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5743#discussion_r29510421
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 ---
@@ -102,7 +93,8 @@ public void handle(RequestMessage request) {
 
   private void processFetchRequest(final ChunkFetchRequest req) {
 final String client = NettyUtils.getRemoteAddress(channel);
-streamIds.add(req.streamChunkId.streamId);
+
+streamManager.registerChannel(channel, req.streamChunkId.streamId);
--- End diff --

As we don't know from this point in the code whether the registerChannel 
method will throw an exception, let's move it inside the try-catch for the 
getChunk(). Otherwise we could leave a message unresponded-to, which would be 
bad.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7183][Network] Fix memory leak of Trans...

2015-05-01 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5743#issuecomment-98205477
  
LGTM, merging in to master. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7183][Network] Fix memory leak of Trans...

2015-04-30 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5743#discussion_r29453281
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/server/StreamManager.java 
---
@@ -44,6 +46,17 @@
   public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
 
   /**
+   * Register the given stream to the associated channel. So these streams 
can be cleaned up later.
--- End diff --

Let's beef up the documentation:

Associates a stream with a single client connection, which is guaranteed to 
be the only reader of the stream. The getChunk() method will be called serially 
on this connection and once the connection is closed, the stream will never be 
used again, enabling cleanup.

This must be called before the first getChunk() on the stream, but it may 
be invoked multiple times with the same channel and stream id.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7183][Network] Fix memory leak of Trans...

2015-04-30 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5743#discussion_r29453868
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
 ---
@@ -56,6 +62,15 @@ public OneForOneStreamManager() {
 // This does not need to be globally unique, only unique to this class.
 nextStreamId = new AtomicLong((long) new 
Random().nextInt(Integer.MAX_VALUE) * 1000);
 streams = new ConcurrentHashMapLong, StreamState();
+streamIds = new ConcurrentHashMapChannel, SetLong();
+  }
+
+  @Override
+  public void registerChannel(Channel channel, long streamId) {
--- End diff --

I think we can avoid the new field by doing

```java
streams.get(streamId).associatedChannel = channel;
```

here and

```java
val streamIterator = streams.iterator()
while (streamIterator.hasNext()) {
  StreamState state = streamIterator.next().getValue()
  if (state.associatedChannel == channel) {
streamIterator.remove();

// Release all remaining buffers.
while (state.buffers.hasNext()) {
  state.buffers.next().release();
}
  }
}
```

Allowing the removal of the other connectionTerminated().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7183][Network] Fix memory leak of Trans...

2015-04-30 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5743#discussion_r29452859
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/server/StreamManager.java 
---
@@ -44,6 +46,17 @@
   public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
 
   /**
+   * Register the given stream to the associated channel. So these streams 
can be cleaned up later.
+   */
+  public void registerChannel(Channel channel, long streamId) { }
+
+  /**
+   * Indicates that the given channel has been terminated. After this 
occurs, we are guaranteed not
+   * to read from the associated streams again, so any state can be 
cleaned up.
+   */
+  public void connectionTerminated(Channel channel) { }
+
+  /**
--- End diff --

Please remove this method, it should no longer be used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7183][Network] Fix memory leak of Trans...

2015-04-30 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5743#discussion_r29453981
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
 ---
@@ -56,6 +62,15 @@ public OneForOneStreamManager() {
 // This does not need to be globally unique, only unique to this class.
 nextStreamId = new AtomicLong((long) new 
Random().nextInt(Integer.MAX_VALUE) * 1000);
 streams = new ConcurrentHashMapLong, StreamState();
+streamIds = new ConcurrentHashMapChannel, SetLong();
--- End diff --

Also, if you wouldn't mind, add a Preconditions.checkNotNull(buffers) to 
line 56 (to ensure buffers is not null).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7183][Network] Fix memory leak of Trans...

2015-04-28 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5743#issuecomment-97192903
  
This is a reasonable solution, but I think that the actual issue that 
TransportRequestHandler has a list of streamIds at all. I think a different 
solution would be to have the StreamManager associated streams with channels 
(which is a documented guarantee already to StreamManager).

This would involve making StreamManager's getChunk take a TransportClient 
(or a channelId, generated via `channel.toString`), which would update the 
StreamState, and then to make connectionTerminated also take this identifier 
(iterating over all streams to close associated ones).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-27 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r29195262
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java 
---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.sasl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.FileRegion;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.util.AbstractReferenceCounted;
+import io.netty.util.ReferenceCountUtil;
+
+import org.apache.spark.network.util.ByteArrayWritableChannel;
+import org.apache.spark.network.util.NettyUtils;
+
+class SaslEncryption {
+
+  @VisibleForTesting
+  static final String ENCRYPTION_HANDLER_NAME = saslEncryption;
+
+  /**
+   * Adds channel handlers that perform encryption / decryption of data 
using SASL.
+   *
+   * @param channel The channel.
+   * @param backend The SASL backend.
+   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted 
blocks, to control
+   * memory usage.
+   */
+  static void addToChannel(
+  Channel channel,
+  SaslEncryptionBackend backend,
+  int maxOutboundBlockSize) {
+channel.pipeline()
+  .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, 
maxOutboundBlockSize))
+  .addFirst(saslDecryption, new DecryptionHandler(backend))
+  .addFirst(saslFrameDecoder, NettyUtils.createFrameDecoder());
+  }
+
+  private static class EncryptionHandler extends 
ChannelOutboundHandlerAdapter {
+
+private final int maxOutboundBlockSize;
+private final SaslEncryptionBackend backend;
+
+EncryptionHandler(SaslEncryptionBackend backend, int 
maxOutboundBlockSize) {
+  this.backend = backend;
+  this.maxOutboundBlockSize = maxOutboundBlockSize;
+}
+
+/**
+ * Wrap the incoming message in an implementation that will perform 
encryption lazily. This is
+ * needed to guarantee ordering of the outgoing encrypted packets - 
they need to be decrypted in
+ * the same order, and netty doesn't have an atomic 
ChannelHandlerContext.write() API, so it
+ * does not guarantee any ordering.
+ */
+@Override
+public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
+  throws Exception {
+
+  ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), 
promise);
+}
+
+@Override
+public void handlerRemoved(ChannelHandlerContext ctx) throws Exception 
{
+  try {
+backend.dispose();
+  } finally {
+super.handlerRemoved(ctx);
+  }
+}
+
+  }
+
+  private static class DecryptionHandler extends 
MessageToMessageDecoderByteBuf {
+
+private final SaslEncryptionBackend backend;
+
+DecryptionHandler(SaslEncryptionBackend backend) {
+  this.backend = backend;
+}
+
+@Override
+protected void decode(ChannelHandlerContext ctx, ByteBuf msg, 
ListObject out)
+  throws Exception {
+
+  byte[] data;
+  int offset;
+  int length = msg.readableBytes();
+  if (msg.hasArray()) {
+data = msg.array();
+offset = msg.arrayOffset();
--- End diff --

I see, it's just slightly odd that only one of the two cases moves msg's 
reader index

[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-27 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r29196457
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java 
---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.sasl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.FileRegion;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.util.AbstractReferenceCounted;
+import io.netty.util.ReferenceCountUtil;
+
+import org.apache.spark.network.util.ByteArrayWritableChannel;
+import org.apache.spark.network.util.NettyUtils;
+
+class SaslEncryption {
+
+  @VisibleForTesting
+  static final String ENCRYPTION_HANDLER_NAME = saslEncryption;
+
+  /**
+   * Adds channel handlers that perform encryption / decryption of data 
using SASL.
+   *
+   * @param channel The channel.
+   * @param backend The SASL backend.
+   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted 
blocks, to control
+   * memory usage.
+   */
+  static void addToChannel(
+  Channel channel,
+  SaslEncryptionBackend backend,
+  int maxOutboundBlockSize) {
+channel.pipeline()
+  .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, 
maxOutboundBlockSize))
+  .addFirst(saslDecryption, new DecryptionHandler(backend))
+  .addFirst(saslFrameDecoder, NettyUtils.createFrameDecoder());
+  }
+
+  private static class EncryptionHandler extends 
ChannelOutboundHandlerAdapter {
+
+private final int maxOutboundBlockSize;
+private final SaslEncryptionBackend backend;
+
+EncryptionHandler(SaslEncryptionBackend backend, int 
maxOutboundBlockSize) {
+  this.backend = backend;
+  this.maxOutboundBlockSize = maxOutboundBlockSize;
+}
+
+/**
+ * Wrap the incoming message in an implementation that will perform 
encryption lazily. This is
+ * needed to guarantee ordering of the outgoing encrypted packets - 
they need to be decrypted in
+ * the same order, and netty doesn't have an atomic 
ChannelHandlerContext.write() API, so it
+ * does not guarantee any ordering.
+ */
+@Override
+public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
+  throws Exception {
+
+  ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), 
promise);
+}
+
+@Override
+public void handlerRemoved(ChannelHandlerContext ctx) throws Exception 
{
+  try {
+backend.dispose();
+  } finally {
+super.handlerRemoved(ctx);
+  }
+}
+
+  }
+
+  private static class DecryptionHandler extends 
MessageToMessageDecoderByteBuf {
+
+private final SaslEncryptionBackend backend;
+
+DecryptionHandler(SaslEncryptionBackend backend) {
+  this.backend = backend;
+}
+
+@Override
+protected void decode(ChannelHandlerContext ctx, ByteBuf msg, 
ListObject out)
+  throws Exception {
+
+  byte[] data;
+  int offset;
+  int length = msg.readableBytes();
+  if (msg.hasArray()) {
+data = msg.array();
+offset = msg.arrayOffset();
+  } else {
+data = new byte[length];
+msg.readBytes(data);
+offset = 0

[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-27 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r29204220
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java 
---
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.sasl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.FileRegion;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.util.AbstractReferenceCounted;
+import io.netty.util.ReferenceCountUtil;
+
+import org.apache.spark.network.util.NettyUtils;
+
+/**
+ * Provides SASL-based encription for transport channels. The single 
method exposed by this
+ * class installs the needed channel handlers on a connected channel.
+ */
+class SaslEncryption {
+
+  @VisibleForTesting
+  static final String ENCRYPTION_HANDLER_NAME = saslEncryption;
+
+  /**
+   * Adds channel handlers that perform encryption / decryption of data 
using SASL.
+   *
+   * @param channel The channel.
+   * @param backend The SASL backend.
+   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted 
blocks, to control
+   * memory usage.
+   */
+  static void addToChannel(
+  Channel channel,
+  SaslEncryptionBackend backend,
+  int maxOutboundBlockSize) {
+channel.pipeline()
+  .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, 
maxOutboundBlockSize))
+  .addFirst(saslDecryption, new DecryptionHandler(backend))
+  .addFirst(saslFrameDecoder, NettyUtils.createFrameDecoder());
+  }
+
+  private static class EncryptionHandler extends 
ChannelOutboundHandlerAdapter {
+
+private final int maxOutboundBlockSize;
+private final SaslEncryptionBackend backend;
+
+EncryptionHandler(SaslEncryptionBackend backend, int 
maxOutboundBlockSize) {
+  this.backend = backend;
+  this.maxOutboundBlockSize = maxOutboundBlockSize;
+}
+
+/**
+ * Wrap the incoming message in an implementation that will perform 
encryption lazily. This is
+ * needed to guarantee ordering of the outgoing encrypted packets - 
they need to be decrypted in
+ * the same order, and netty doesn't have an atomic 
ChannelHandlerContext.write() API, so it
+ * does not guarantee any ordering.
+ */
+@Override
+public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
+  throws Exception {
+
+  ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), 
promise);
+}
+
+@Override
+public void handlerRemoved(ChannelHandlerContext ctx) throws Exception 
{
+  try {
+backend.dispose();
+  } finally {
+super.handlerRemoved(ctx);
+  }
+}
+
+  }
+
+  private static class DecryptionHandler extends 
MessageToMessageDecoderByteBuf {
+
+private final SaslEncryptionBackend backend;
+
+DecryptionHandler(SaslEncryptionBackend backend) {
+  this.backend = backend;
+}
+
+@Override
+protected void decode(ChannelHandlerContext ctx, ByteBuf msg, 
ListObject out)
+  throws Exception {
+
+  byte[] data;
+  int offset;
+  int length = msg.readableBytes

[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-26 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r29121679
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java 
---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.sasl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.FileRegion;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.util.AbstractReferenceCounted;
+import io.netty.util.ReferenceCountUtil;
+
+import org.apache.spark.network.util.ByteArrayWritableChannel;
+import org.apache.spark.network.util.NettyUtils;
+
+class SaslEncryption {
--- End diff --

Please add a class comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-26 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r29121182
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java 
---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.sasl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.FileRegion;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.util.AbstractReferenceCounted;
+import io.netty.util.ReferenceCountUtil;
+
+import org.apache.spark.network.util.ByteArrayWritableChannel;
+import org.apache.spark.network.util.NettyUtils;
+
+class SaslEncryption {
+
+  @VisibleForTesting
+  static final String ENCRYPTION_HANDLER_NAME = saslEncryption;
+
+  /**
+   * Adds channel handlers that perform encryption / decryption of data 
using SASL.
+   *
+   * @param channel The channel.
+   * @param backend The SASL backend.
+   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted 
blocks, to control
+   * memory usage.
+   */
+  static void addToChannel(
+  Channel channel,
+  SaslEncryptionBackend backend,
+  int maxOutboundBlockSize) {
+channel.pipeline()
+  .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, 
maxOutboundBlockSize))
+  .addFirst(saslDecryption, new DecryptionHandler(backend))
+  .addFirst(saslFrameDecoder, NettyUtils.createFrameDecoder());
+  }
+
+  private static class EncryptionHandler extends 
ChannelOutboundHandlerAdapter {
+
+private final int maxOutboundBlockSize;
+private final SaslEncryptionBackend backend;
+
+EncryptionHandler(SaslEncryptionBackend backend, int 
maxOutboundBlockSize) {
+  this.backend = backend;
+  this.maxOutboundBlockSize = maxOutboundBlockSize;
+}
+
+/**
+ * Wrap the incoming message in an implementation that will perform 
encryption lazily. This is
+ * needed to guarantee ordering of the outgoing encrypted packets - 
they need to be decrypted in
+ * the same order, and netty doesn't have an atomic 
ChannelHandlerContext.write() API, so it
+ * does not guarantee any ordering.
+ */
+@Override
+public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
+  throws Exception {
+
+  ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), 
promise);
+}
+
+@Override
+public void handlerRemoved(ChannelHandlerContext ctx) throws Exception 
{
+  try {
+backend.dispose();
+  } finally {
+super.handlerRemoved(ctx);
+  }
+}
+
+  }
+
+  private static class DecryptionHandler extends 
MessageToMessageDecoderByteBuf {
+
+private final SaslEncryptionBackend backend;
+
+DecryptionHandler(SaslEncryptionBackend backend) {
+  this.backend = backend;
+}
+
+@Override
+protected void decode(ChannelHandlerContext ctx, ByteBuf msg, 
ListObject out)
+  throws Exception {
+
+  byte[] data;
+  int offset;
+  int length = msg.readableBytes();
+  if (msg.hasArray()) {
+data = msg.array();
+offset = msg.arrayOffset();
+  } else {
+data = new byte[length];
+msg.readBytes(data);
+offset = 0

[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-26 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r29121766
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/TransportContext.java ---
@@ -99,9 +108,9 @@ public TransportServer createServer() {
* be used to communicate on this channel. The TransportClient is 
directly associated with a
* ChannelHandler to ensure all users of the same channel get the same 
TransportClient object.
*/
-  public TransportChannelHandler initializePipeline(SocketChannel channel) 
{
+  public TransportChannelHandler initializePipeline(SocketChannel channel, 
RpcHandler rpcHandler) {
--- End diff --

The rpcHandler vs appRpcHandler thing is definitely confusing to someone 
reading this for the first time, please add a comment here or in 
TransportContext about the difference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-26 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r29121411
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java 
---
@@ -46,19 +47,30 @@
   /** Class which provides secret keys which are shared by server and 
client on a per-app basis. */
   private final SecretKeyHolder secretKeyHolder;
 
-  /** Maps each channel to its SASL authentication state. */
-  private final ConcurrentMapTransportClient, SparkSaslServer 
channelAuthenticationMap;
+  /** The client channel. */
+  private final Channel channel;
 
-  public SaslRpcHandler(RpcHandler delegate, SecretKeyHolder 
secretKeyHolder) {
+  private final TransportConf conf;
+
+  private SparkSaslServer saslServer;
+  private boolean isComplete;
+
+  SaslRpcHandler(
+  TransportConf conf,
+  Channel channel,
+  RpcHandler delegate,
+  SecretKeyHolder secretKeyHolder) {
+this.conf = conf;
--- End diff --

nit: reorder fields to follow same order as constructor parameters


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-26 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r29121094
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java 
---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.sasl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.FileRegion;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.util.AbstractReferenceCounted;
+import io.netty.util.ReferenceCountUtil;
+
+import org.apache.spark.network.util.ByteArrayWritableChannel;
+import org.apache.spark.network.util.NettyUtils;
+
+class SaslEncryption {
+
+  @VisibleForTesting
+  static final String ENCRYPTION_HANDLER_NAME = saslEncryption;
+
+  /**
+   * Adds channel handlers that perform encryption / decryption of data 
using SASL.
+   *
+   * @param channel The channel.
+   * @param backend The SASL backend.
+   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted 
blocks, to control
+   * memory usage.
+   */
+  static void addToChannel(
+  Channel channel,
+  SaslEncryptionBackend backend,
+  int maxOutboundBlockSize) {
+channel.pipeline()
+  .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, 
maxOutboundBlockSize))
+  .addFirst(saslDecryption, new DecryptionHandler(backend))
+  .addFirst(saslFrameDecoder, NettyUtils.createFrameDecoder());
+  }
+
+  private static class EncryptionHandler extends 
ChannelOutboundHandlerAdapter {
+
+private final int maxOutboundBlockSize;
+private final SaslEncryptionBackend backend;
+
+EncryptionHandler(SaslEncryptionBackend backend, int 
maxOutboundBlockSize) {
+  this.backend = backend;
+  this.maxOutboundBlockSize = maxOutboundBlockSize;
+}
+
+/**
+ * Wrap the incoming message in an implementation that will perform 
encryption lazily. This is
+ * needed to guarantee ordering of the outgoing encrypted packets - 
they need to be decrypted in
+ * the same order, and netty doesn't have an atomic 
ChannelHandlerContext.write() API, so it
+ * does not guarantee any ordering.
+ */
+@Override
+public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
+  throws Exception {
+
+  ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), 
promise);
+}
+
+@Override
+public void handlerRemoved(ChannelHandlerContext ctx) throws Exception 
{
+  try {
+backend.dispose();
+  } finally {
+super.handlerRemoved(ctx);
+  }
+}
+
+  }
+
+  private static class DecryptionHandler extends 
MessageToMessageDecoderByteBuf {
+
+private final SaslEncryptionBackend backend;
+
+DecryptionHandler(SaslEncryptionBackend backend) {
+  this.backend = backend;
+}
+
+@Override
+protected void decode(ChannelHandlerContext ctx, ByteBuf msg, 
ListObject out)
+  throws Exception {
+
+  byte[] data;
+  int offset;
+  int length = msg.readableBytes();
+  if (msg.hasArray()) {
+data = msg.array();
+offset = msg.arrayOffset();
--- End diff --

should we advance msg by the readableBytes?


---
If your project is set up for it, you can

[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-26 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r29121561
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java 
---
@@ -75,11 +81,20 @@
   private final SecretKeyHolder secretKeyHolder;
   private SaslServer saslServer;
 
-  public SparkSaslServer(String secretKeyId, SecretKeyHolder 
secretKeyHolder) {
+  public SparkSaslServer(
+  String secretKeyId,
+  SecretKeyHolder secretKeyHolder,
+  boolean alwaysEncrypt) {
 this.secretKeyId = secretKeyId;
 this.secretKeyHolder = secretKeyHolder;
+
+String qop = alwaysEncrypt ? QOP_AUTH_CONF : String.format(%s,%s, 
QOP_AUTH_CONF, QOP_AUTH);
--- End diff --

I assume this is a comma-separated list of the supported formats, for 
negotiation? Maybe add a comment to this effect.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-26 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r29121544
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java 
---
@@ -60,13 +60,19 @@
   static final String DIGEST = DIGEST-MD5;
 
   /**
-   * The quality of protection is just auth. This means that we are doing
-   * authentication only, we are not supporting integrity or privacy 
protection of the
-   * communication channel after authentication. This could be changed to 
be configurable
-   * in the future.
+   * QOP value that includes encryption.
--- End diff --

I liked the spelled-out quality of protection better :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-26 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r29121527
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java 
---
@@ -60,13 +60,19 @@
   static final String DIGEST = DIGEST-MD5;
 
   /**
-   * The quality of protection is just auth. This means that we are doing
-   * authentication only, we are not supporting integrity or privacy 
protection of the
-   * communication channel after authentication. This could be changed to 
be configurable
-   * in the future.
+   * QOP value that includes encryption.
+   */
+  static final String QOP_AUTH_CONF = auth-conf;
+
+  /**
+   * QOP value that does not include encryption.
+   */
+  static final String QOP_AUTH = auth;
+
+  /**
+   * Common SASL config properties for both client and server.
*/
   static final MapString, String SASL_PROPS = ImmutableMap.String, 
Stringbuilder()
-.put(Sasl.QOP, auth)
 .put(Sasl.SERVER_AUTH, true)
--- End diff --

Is this property relevant for the client? Potentially we could just do away 
with this static map if not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-26 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5377#issuecomment-96514035
  
LGTM, only minor comments. The tests look good. Apologies for taking so 
long to review!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-26 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r29121332
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java 
---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.sasl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.FileRegion;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.util.AbstractReferenceCounted;
+import io.netty.util.ReferenceCountUtil;
+
+import org.apache.spark.network.util.ByteArrayWritableChannel;
+import org.apache.spark.network.util.NettyUtils;
+
+class SaslEncryption {
+
+  @VisibleForTesting
+  static final String ENCRYPTION_HANDLER_NAME = saslEncryption;
+
+  /**
+   * Adds channel handlers that perform encryption / decryption of data 
using SASL.
+   *
+   * @param channel The channel.
+   * @param backend The SASL backend.
+   * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted 
blocks, to control
+   * memory usage.
+   */
+  static void addToChannel(
+  Channel channel,
+  SaslEncryptionBackend backend,
+  int maxOutboundBlockSize) {
+channel.pipeline()
+  .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, 
maxOutboundBlockSize))
+  .addFirst(saslDecryption, new DecryptionHandler(backend))
+  .addFirst(saslFrameDecoder, NettyUtils.createFrameDecoder());
+  }
+
+  private static class EncryptionHandler extends 
ChannelOutboundHandlerAdapter {
+
+private final int maxOutboundBlockSize;
+private final SaslEncryptionBackend backend;
+
+EncryptionHandler(SaslEncryptionBackend backend, int 
maxOutboundBlockSize) {
+  this.backend = backend;
+  this.maxOutboundBlockSize = maxOutboundBlockSize;
+}
+
+/**
+ * Wrap the incoming message in an implementation that will perform 
encryption lazily. This is
+ * needed to guarantee ordering of the outgoing encrypted packets - 
they need to be decrypted in
+ * the same order, and netty doesn't have an atomic 
ChannelHandlerContext.write() API, so it
+ * does not guarantee any ordering.
+ */
+@Override
+public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
+  throws Exception {
+
+  ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), 
promise);
+}
+
+@Override
+public void handlerRemoved(ChannelHandlerContext ctx) throws Exception 
{
+  try {
+backend.dispose();
+  } finally {
+super.handlerRemoved(ctx);
+  }
+}
+
+  }
+
+  private static class DecryptionHandler extends 
MessageToMessageDecoderByteBuf {
+
+private final SaslEncryptionBackend backend;
+
+DecryptionHandler(SaslEncryptionBackend backend) {
+  this.backend = backend;
+}
+
+@Override
+protected void decode(ChannelHandlerContext ctx, ByteBuf msg, 
ListObject out)
+  throws Exception {
+
+  byte[] data;
+  int offset;
+  int length = msg.readableBytes();
+  if (msg.hasArray()) {
+data = msg.array();
+offset = msg.arrayOffset();
+  } else {
+data = new byte[length];
+msg.readBytes(data);
+offset = 0

[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-26 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r29121814
  
--- Diff: 
network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java 
---
@@ -86,4 +117,237 @@ public void testNonMatching() {
   assertFalse(server.isComplete());
 }
   }
+
+  @Test
+  public void testSaslAuthentication() throws Exception {
+testBasicSasl(false);
+  }
+
+  @Test
+  public void testSaslEncryption() throws Exception {
--- End diff --

I think these methods should be right next to testBasicSasl's definition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-26 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r29121626
  
--- Diff: network/common/src/test/resources/log4j.properties ---
@@ -23,5 +23,5 @@ log4j.appender.file.file=target/unit-tests.log
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
 log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t 
%p %c{1}: %m%n
 
-# Silence verbose logs from 3rd-party libraries.
+# Filter debug messages from noisy 3rd-party libs.
--- End diff --

kind of a funny change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-26 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r29121639
  
--- Diff: 
network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
 ---
@@ -58,10 +60,15 @@
   public ExternalShuffleClient(
   TransportConf conf,
   SecretKeyHolder secretKeyHolder,
-  boolean saslEnabled) {
+  boolean saslEnabled,
+  boolean saslEncryptionEnabled) {
+Preconditions.checkArgument(
+!saslEncryptionEnabled || saslEnabled,
--- End diff --

nit: I think 2 space indent is appropriate here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7120][SPARK-7121][WIP] Closure cleaner ...

2015-04-24 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5685#discussion_r29065533
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -167,15 +294,17 @@ private[spark] object ClosureCleaner extends Logging {
 }
   }
 
-  private def instantiateClass(cls: Class[_], outer: AnyRef, 
inInterpreter: Boolean): AnyRef = {
-// logInfo(Creating a  + cls +  with outer =  + outer)
+  private def instantiateClass(
+  cls: Class[_],
+  enclosingObject: AnyRef,
+  inInterpreter: Boolean): AnyRef = {
 if (!inInterpreter) {
   // This is a bona fide closure class, whose constructor has no 
effects
   // other than to set its fields, so use its constructor
   val cons = cls.getConstructors()(0)
   val params = cons.getParameterTypes.map(createNullValue).toArray
-  if (outer != null) {
-params(0) = outer // First param is always outer object
+  if (enclosingObject!= null) {
--- End diff --

nit spcae


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7120][SPARK-7121][WIP] Closure cleaner ...

2015-04-24 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5685#discussion_r29081651
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala ---
@@ -50,7 +50,7 @@ class ClosureCleanerSuite extends FunSuite {
 val obj = new TestClassWithNesting(1)
 assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
   }
-  
+
--- End diff --

The suite should be expanded with the fix, right? Could you also test the 
common `for` loop case that others have seen in the past?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7003] Improve reliability of connection...

2015-04-20 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5584#issuecomment-94507189
  
Merging into master. We may consider backporting to 1.3 if it turns out 
this fixes [SPARK-6962](https://issues.apache.org/jira/browse/SPARK-6962), 
which is a pretty nasty bug for those who run into it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6955] Perform port retries at NettyBloc...

2015-04-20 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5575#discussion_r28721339
  
--- Diff: 
core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import org.apache.spark.network.BlockDataManager
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.mockito.Mockito.mock
+import org.scalatest._
+
+class NettyBlockTransferServiceSuite extends FunSuite with 
BeforeAndAfterEach with ShouldMatchers {
+  private var service0: NettyBlockTransferService = _
+  private var service1: NettyBlockTransferService = _
+
+  override def afterEach() {
+if (service0 != null) {
+  service0.close()
+  service0 = null
+}
+
+if (service1 != null) {
+  service1.close()
+  service1 = null
+}
+  }
+
+  test(can bind to a random port) {
+service0 = createService(port = 0)
+service0.port should not be 0
+  }
+
+  test(can bind to two random ports) {
+service0 = createService(port = 0)
+service1 = createService(port = 0)
+service0.port should not be service1.port
+  }
+
+  test(can bind to a specific port) {
+val port = 17634
+service0 = createService(port)
+service0.port should be = port
+service0.port should be = (port + 10) // avoid testing equality in 
case of simultaneous tests
--- End diff --

I can increase the number no problem, though I guess we should use 
`Utils.portMaxRetries` instead. However, note that even 10 should be very 
unlikely to be hit, that would mean there's 5-10 concurrently running tests at 
this exact part of the build, or else we're very, very unlucky.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6955] Perform port retries at NettyBloc...

2015-04-19 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5575#issuecomment-94243393
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6955] Perform port retries at NettyBloc...

2015-04-19 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5575#issuecomment-94307038
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7003] Improve reliability of connection...

2015-04-19 Thread aarondav
GitHub user aarondav opened a pull request:

https://github.com/apache/spark/pull/5584

[SPARK-7003] Improve reliability of connection failure detection between 
Netty block transfer service endpoints

Currently we rely on the assumption that an exception will be raised and 
the channel closed if two endpoints cannot communicate over a Netty TCP 
channel. However, this guarantee does not hold in all network environments, and 
[SPARK-6962](https://issues.apache.org/jira/browse/SPARK-6962) seems to point 
to a case where only the server side of the connection detected a fault.

This patch improves robustness of fetch/rpc requests by having an explicit 
timeout in the transport layer which closes the connection if there is a period 
of inactivity while there are outstanding requests.

NB: This patch is actually only around 50 lines added if you exclude the 
testing-related code.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aarondav/spark timeout

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/5584.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5584


commit aa5278b5e558c5416caa165412f24364a6e1df78
Author: Aaron Davidson aa...@databricks.com
Date:   2015-04-20T02:46:15Z

[SPARK-7003] Improve reliability of connection failure detection between 
Netty block transfer service endpoints

Currently we rely on the assumption that an exception will be raised and 
the channel closed if two endpoints cannot communicate over a Netty TCP 
channel. However, this guarantee does not hold in all network environments, and 
[SPARK-6962](https://issues.apache.org/jira/browse/SPARK-6962) seems to point 
to a case where only the server side of the connection detected a fault.

This patch improves robustness of fetch/rpc requests by having an explicit 
timeout in the transport layer which closes the connection if there is a period 
of inactivity while there are outstanding requests.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7003] Improve reliability of connection...

2015-04-19 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5584#issuecomment-94344712
  
cc @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7003] Improve reliability of connection...

2015-04-19 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5584#discussion_r28662180
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/** ConfigProvider based on an immutable Map. */
--- End diff --

that's like twice as many characters -- do you even tweet, bro?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7003] Improve reliability of connection...

2015-04-19 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5584#discussion_r28662130
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/** ConfigProvider based on an immutable Map. */
--- End diff --

well we make a copy, was the point


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7003] Improve reliability of connection...

2015-04-19 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5584#discussion_r28662125
  
--- Diff: 
network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
 ---
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.ChunkReceivedCallback;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.client.TransportClientFactory;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.StreamManager;
+import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.util.MapConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+import org.junit.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Suite which ensures that requests that go without a response for the 
network timeout period are
+ * failed, and the connection closed.
+ *
+ * In this suite, we use 5 seconds as the connection timeout, with some 
slack given in the tests,
+ * to ensure stability in different test environments.
+ */
+public class RequestTimeoutIntegrationSuite {
+
+  private TransportServer server;
+  private TransportClientFactory clientFactory;
+
+  private StreamManager defaultManager;
+  private TransportConf conf;
+
+  // A large timeout that shouldn't happen, for the sake of faulty tests 
not hanging forever.
+  private final int FOREVER = 60 * 1000;
+
+  @Before
+  public void setUp() throws Exception {
+MapString, String configMap = Maps.newHashMap();
+configMap.put(spark.shuffle.io.connectionTimeout, 5s);
--- End diff --

The tests take 19 seconds, which is annoying but preferable to sporadically 
failing. I don't trust overloaded Jenkins machines, but I could drop it by a 
couple factors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7003] Improve reliability of connection...

2015-04-19 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5584#issuecomment-94347409
  
All comments addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6955] Perform port retries at NettyBloc...

2015-04-18 Thread aarondav
GitHub user aarondav opened a pull request:

https://github.com/apache/spark/pull/5575

[SPARK-6955] Perform port retries at NettyBlockTransferService level

Currently we're doing port retries in the TransportServer level, but this 
is not specified by the TransportContext API and it has other further-reaching 
impacts like causing undesirable behavior for the Yarn and Standalone shuffle 
services.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aarondav/spark port-bind

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/5575.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5575


commit c671f6fe28a65acc18034785dc5566ac475c31f0
Author: Aaron Davidson aa...@databricks.com
Date:   2015-04-18T17:46:05Z

[SPARK-6955] Perform port retries at NettyBlockTransferService level

Currently we're doing port retries in the TransportServer level, but this 
is not specified by the TransportContext API and it has other further-reaching 
impacts like causing undesirable behvior for the Yarn and Standalone shuffle 
services.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6955] Perform port retries at NettyBloc...

2015-04-18 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5575#issuecomment-94188006
  
err, cc @andrewor14, not @andrewor13.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6955] Perform port retries at NettyBloc...

2015-04-18 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5575#issuecomment-94187977
  
cc @SaintBacchus @andrewor13 @vanzin This patch aims to resolve SPARK-6955 
while keeping the fix for SPARK-5444.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6955][NETWORK]Do not let Yarn Shuffle S...

2015-04-18 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5537#issuecomment-94187864
  
Ah, I realize now that the original change that brought this bug up (#4240) 
was perhaps not the right solution. The issue that #4240 solved was actually 
brought up beforehand 
(https://github.com/apache/spark/pull/3688#issuecomment-69588907). I think my 
recommended fix then would still be good, as it would not affect the Yarn or 
Standalone Worker shuffle services.

I created a patch which aims to resolve both #4240 and this PR's issues: 
#5575.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6955][NETWORK]Do not let Yarn Shuffle S...

2015-04-17 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5537#discussion_r28624950
  
--- Diff: 
core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala ---
@@ -43,8 +43,12 @@ object SparkTransportConf {
* @param numUsableCores if nonzero, this will restrict the server and 
client threads to only
*   use the given number of cores, rather than all 
of the machine's cores.
*   This restriction will only occur if these 
properties are not already set.
+   * @param disablePortRetry if true, server will not retry its port. It's 
better for the long-run
+   *server to disable it since the server and 
client had the agreement of
+   *the specific port.
*/
-  def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): 
TransportConf = {
+  def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0,
--- End diff --

Definitely not meant to be public, could make it `private[spark]` and add 
exclusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6955][NETWORK]Do not let Yarn Shuffle S...

2015-04-17 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5537#issuecomment-94062816
  
The changes as they currently stand LGTM, let's just fix up the style as 
@vanzin pointed out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-16 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r28489592
  
--- Diff: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 ---
@@ -49,18 +49,21 @@ class NettyBlockTransferService(conf: SparkConf, 
securityManager: SecurityManage
   private[this] var appId: String = _
 
   override def init(blockDataManager: BlockDataManager): Unit = {
-val (rpcHandler: RpcHandler, bootstrap: 
Option[TransportClientBootstrap]) = {
-  val nettyRpcHandler = new NettyBlockRpcServer(serializer, 
blockDataManager)
-  if (!authEnabled) {
-(nettyRpcHandler, None)
+val rpcHandler = new NettyBlockRpcServer(serializer, blockDataManager)
+val (serverBootstrap: Option[TransportServerBootstrap],
--- End diff --

nit: This syntax gets unwieldy with more code in here, I'd advise switching 
to something like
```scala
var serverBootstrap: Option[TransportServerBootstrap] = None
var clientBootstrap: Option[TransportClientBootstrap] = None
if (authEnabled) {
  serverBootstrap = Some(new SaslServerBootstrap(transportConf, 
securityManager)
  clientBootstrap = Some(new SaslClientBootstrap(transportConf, 
conf.getAppId, securityManager,
securityManager.isSaslEncryptionEnabled()))
}
```

uses the dreaded `var` but I think it may be less syntactically cumbersome.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-16 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r28489665
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/TransportContext.java ---
@@ -55,14 +56,14 @@
   private final Logger logger = 
LoggerFactory.getLogger(TransportContext.class);
 
   private final TransportConf conf;
-  private final RpcHandler rpcHandler;
+  private final RpcHandler appRpcHandler;
--- End diff --

Why was this guy renamed? What is the app?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-16 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5377#issuecomment-93677780
  
This is looking very good to me. I've reviewed the core transport part and 
I like the API. I will have to defer reviewing the rest of the SASL Encryption 
side and the tests for a bit longer, though, so apologies on that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...

2015-04-16 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5377#discussion_r28491206
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/TransportContext.java ---
@@ -55,14 +56,14 @@
   private final Logger logger = 
LoggerFactory.getLogger(TransportContext.class);
 
   private final TransportConf conf;
-  private final RpcHandler rpcHandler;
+  private final RpcHandler appRpcHandler;
--- End diff --

Ah -- I see, this is the guy before the bootstraps are executed, which is 
used by the client directly and as the original handler otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >