[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2017-11-26 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19788
  
Can we just add the `ContinuousShuffleBlockId` without adding new conf 
`spark.shuffle.continuousFetch`? While in classes related to shuffle read like 
`ShuffleBlockFetcherIterator`, we also pattern match the formal 
`ShuffleBlockId`. This way no addition confs are needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...

2017-11-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19752#discussion_r153124078
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -158,111 +178,73 @@ abstract class CaseWhenBase(
 val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("")
 "CASE" + cases + elseCase + " END"
   }
-}
-
-
-/**
- * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE 
e] END".
- * When a = true, returns b; when c = true, returns d; else returns e.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE 
expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, 
returns `expr4`; else returns `expr5`.",
-  arguments = """
-Arguments:
-  * expr1, expr3 - the branch condition expressions should all be 
boolean type.
-  * expr2, expr4, expr5 - the branch value expressions and else value 
expression should all be
-  same type or coercible to a common type.
-  """,
-  examples = """
-Examples:
-  > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   1
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   2
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END;
-   NULL
-  """)
-// scalastyle:on line.size.limit
-case class CaseWhen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with CodegenFallback with 
Serializable {
-
-  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-super[CodegenFallback].doGenCode(ctx, ev)
-  }
-
-  def toCodegen(): CaseWhenCodegen = {
-CaseWhenCodegen(branches, elseValue)
-  }
-}
-
-/**
- * CaseWhen expression used when code generation condition is satisfied.
- * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-case class CaseWhenCodegen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with Serializable {
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-// Generate code that looks like:
-//
-// condA = ...
-// if (condA) {
-//   valueA
-// } else {
-//   condB = ...
-//   if (condB) {
-// valueB
-//   } else {
-// condC = ...
-// if (condC) {
-//   valueC
-// } else {
-//   elseValue
-// }
-//   }
-// }
+// This variable represents whether the first successful condition is 
met or not.
+// It is initialized to `false` and it is set to `true` when the first 
condition which
+// evaluates to `true` is met and therefore is not needed to go on 
anymore on the computation
+// of the following conditions.
+val conditionMet = ctx.freshName("caseWhenConditionMet")
+ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull)
+ctx.addMutableState(ctx.javaType(dataType), ev.value)
 val cases = branches.map { case (condExpr, valueExpr) =>
   val cond = condExpr.genCode(ctx)
   val res = valueExpr.genCode(ctx)
   s"""
-${cond.code}
-if (!${cond.isNull} && ${cond.value}) {
-  ${res.code}
-  ${ev.isNull} = ${res.isNull};
-  ${ev.value} = ${res.value};
+if(!$conditionMet) {
+  ${cond.code}
+  if (!${cond.isNull} && ${cond.value}) {
+${res.code}
+${ev.isNull} = ${res.isNull};
+${ev.value} = ${res.value};
+$conditionMet = true;
+  }
 }
   """
 }
 
-var generatedCode = cases.mkString("", "\nelse {\n", "\nelse {\n")
-
-elseValue.foreach { elseExpr =>
+val elseCode = elseValue.map { elseExpr =>
   val res = elseExpr.genCode(ctx)
-  generatedCode +=
-s"""
+  s"""
+if(!$conditionMet) {
   ${res.code}
   ${ev.isNull} = ${res.isNull};
   ${ev.value} = ${res.value};
-"""
+}
+  """
 }
 
-generatedCode += "}\n" * cases.size
+val 

[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...

2017-11-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19821#discussion_r153123637
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -785,13 +785,36 @@ class CodegenContext {
* @param expressions the codes to evaluate expressions.
*/
   def splitExpressions(row: String, expressions: Seq[String]): String = {
-if (row == null || currentVars != null) {
+if (INPUT_ROW == null || currentVars != null) {
   // Cannot split these expressions because they are not created from 
a row object.
   return expressions.mkString("\n")
 }
 splitExpressions(expressions, funcName = "apply", arguments = 
("InternalRow", row) :: Nil)
   }
 
+  /**
+   * Splits the generated code of expressions into multiple functions, 
because function has
+   * 64kb code size limit in JVM. This version takes care of INPUT_ROW and 
currentVars
+   *
+   * @param expressions the codes to evaluate expressions.
+   * @param funcName the split function name base.
+   * @param argumentsExceptRow the list of (type, name) of the arguments 
of the split function
+   *   except for ctx.INPUT_ROW
+  */
+  def splitExpressions(
+  expressions: Seq[String],
+  funcName: String,
+  argumentsExceptRow: Seq[(String, String)]): String = {
--- End diff --

Could you check the caller of case 3 which also need to check `INPUT_ROW` 
and `currentVars`? It sounds like some of them miss the checking. 

In addition, case `1` and `2` can be easily combined. 

I think we need a different name for case `1` and `2`. How about 
`splitExpressionsOnInputRow`?

cc @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18641: [SPARK-21413][SQL] Fix 64KB JVM bytecode limit pr...

2017-11-26 Thread kiszk
Github user kiszk closed the pull request at:

https://github.com/apache/spark/pull/18641


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18641: [SPARK-21413][SQL] Fix 64KB JVM bytecode limit problem i...

2017-11-26 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/18641
  
#19752 will cover this solution.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...

2017-11-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19752#discussion_r153123184
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -158,111 +178,73 @@ abstract class CaseWhenBase(
 val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("")
 "CASE" + cases + elseCase + " END"
   }
-}
-
-
-/**
- * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE 
e] END".
- * When a = true, returns b; when c = true, returns d; else returns e.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE 
expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, 
returns `expr4`; else returns `expr5`.",
-  arguments = """
-Arguments:
-  * expr1, expr3 - the branch condition expressions should all be 
boolean type.
-  * expr2, expr4, expr5 - the branch value expressions and else value 
expression should all be
-  same type or coercible to a common type.
-  """,
-  examples = """
-Examples:
-  > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   1
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   2
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END;
-   NULL
-  """)
-// scalastyle:on line.size.limit
-case class CaseWhen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with CodegenFallback with 
Serializable {
-
-  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-super[CodegenFallback].doGenCode(ctx, ev)
-  }
-
-  def toCodegen(): CaseWhenCodegen = {
-CaseWhenCodegen(branches, elseValue)
-  }
-}
-
-/**
- * CaseWhen expression used when code generation condition is satisfied.
- * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-case class CaseWhenCodegen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with Serializable {
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-// Generate code that looks like:
-//
-// condA = ...
-// if (condA) {
-//   valueA
-// } else {
-//   condB = ...
-//   if (condB) {
-// valueB
-//   } else {
-// condC = ...
-// if (condC) {
-//   valueC
-// } else {
-//   elseValue
-// }
-//   }
-// }
+// This variable represents whether the first successful condition is 
met or not.
+// It is initialized to `false` and it is set to `true` when the first 
condition which
+// evaluates to `true` is met and therefore is not needed to go on 
anymore on the computation
+// of the following conditions.
+val conditionMet = ctx.freshName("caseWhenConditionMet")
+ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull)
+ctx.addMutableState(ctx.javaType(dataType), ev.value)
 val cases = branches.map { case (condExpr, valueExpr) =>
   val cond = condExpr.genCode(ctx)
   val res = valueExpr.genCode(ctx)
   s"""
-${cond.code}
-if (!${cond.isNull} && ${cond.value}) {
-  ${res.code}
-  ${ev.isNull} = ${res.isNull};
-  ${ev.value} = ${res.value};
+if(!$conditionMet) {
+  ${cond.code}
+  if (!${cond.isNull} && ${cond.value}) {
+${res.code}
+${ev.isNull} = ${res.isNull};
+${ev.value} = ${res.value};
+$conditionMet = true;
+  }
 }
   """
 }
 
-var generatedCode = cases.mkString("", "\nelse {\n", "\nelse {\n")
-
-elseValue.foreach { elseExpr =>
+val elseCode = elseValue.map { elseExpr =>
   val res = elseExpr.genCode(ctx)
-  generatedCode +=
-s"""
+  s"""
+if(!$conditionMet) {
   ${res.code}
   ${ev.isNull} = ${res.isNull};
   ${ev.value} = ${res.value};
-"""
+}
+  """
 }
 
-generatedCode += "}\n" * cases.size
+val allConditions 

[GitHub] spark issue #19813: [WIP][SPARK-22600][SQL] Fix 64kb limit for deeply nested...

2017-11-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19813
  
**[Test build #84207 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84207/testReport)**
 for PR 19813 at commit 
[`9f848be`](https://github.com/apache/spark/commit/9f848be45dcc294d6f27f2c6eaeed1907d36f004).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...

2017-11-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19821#discussion_r153120458
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -785,13 +785,36 @@ class CodegenContext {
* @param expressions the codes to evaluate expressions.
*/
   def splitExpressions(row: String, expressions: Seq[String]): String = {
--- End diff --

I see. In addition to that, how about this since many caller passes 
`INPUT_ROW`?
```
def splitExpressions(row: String, arguments: Seq[(String, String)] = 
("InternalRow", INPUT_ROW)): String = {
```
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...

2017-11-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19821#discussion_r153120160
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -785,13 +785,36 @@ class CodegenContext {
* @param expressions the codes to evaluate expressions.
*/
   def splitExpressions(row: String, expressions: Seq[String]): String = {
-if (row == null || currentVars != null) {
+if (INPUT_ROW == null || currentVars != null) {
   // Cannot split these expressions because they are not created from 
a row object.
   return expressions.mkString("\n")
 }
 splitExpressions(expressions, funcName = "apply", arguments = 
("InternalRow", row) :: Nil)
   }
 
+  /**
+   * Splits the generated code of expressions into multiple functions, 
because function has
+   * 64kb code size limit in JVM. This version takes care of INPUT_ROW and 
currentVars
+   *
+   * @param expressions the codes to evaluate expressions.
+   * @param funcName the split function name base.
+   * @param argumentsExceptRow the list of (type, name) of the arguments 
of the split function
+   *   except for ctx.INPUT_ROW
+  */
+  def splitExpressions(
+  expressions: Seq[String],
+  funcName: String,
+  argumentsExceptRow: Seq[(String, String)]): String = {
--- End diff --

Now, we have three `splitExpressions` in this PR.
1. `splitExpression(row, expressions)`
2. `splitExpression(expressions, funcName, arguments)`
3. `splitExpression(expressions, funcName, arguments, returnType, ...)`

It is hard to combine 2. and 3. since 2. takes care of `INPUT_ROW` and 
`currentVars` while 3. does not take care of them.
Are you suggesting me to combine 1. and 2. which take care of `INPUT_ROW` 
and `currentVars`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

2017-11-26 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19764
  
@caneGuy Can you give a specific example to illustrate your change? Maybe 
former partition result & later partition result?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2017-11-26 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19788
  
@yucai I'm thinking of the necessity to add this new configuration 
`spark.shuffle.continuousFetch` like you mentioned above. This PR you proposed 
is actually a superset of previous way, it is compatible with original shuffle 
way if length = 1. The configuration here is only used to keep compatible for 
external shuffle service, I think it is not so intuitive and user may confused 
whether this should be enabled or not (since since conf is 
functionality-orientiated). Besides do we need to guarantee forward compatible, 
also is there a transparent way to automatically switch between two shuffles 
without configuration?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...

2017-11-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19752
  
**[Test build #84206 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84206/testReport)**
 for PR 19752 at commit 
[`f4c7896`](https://github.com/apache/spark/commit/f4c78965a8cee34e3be8b9d8e264f2d6eb0d27f5).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...

2017-11-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19752#discussion_r153118605
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -211,111 +231,73 @@ abstract class CaseWhenBase(
 val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("")
 "CASE" + cases + elseCase + " END"
   }
-}
-
-
-/**
- * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE 
e] END".
- * When a = true, returns b; when c = true, returns d; else returns e.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE 
expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, 
returns `expr4`; else returns `expr5`.",
-  arguments = """
-Arguments:
-  * expr1, expr3 - the branch condition expressions should all be 
boolean type.
-  * expr2, expr4, expr5 - the branch value expressions and else value 
expression should all be
-  same type or coercible to a common type.
-  """,
-  examples = """
-Examples:
-  > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   1
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   2
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END;
-   NULL
-  """)
-// scalastyle:on line.size.limit
-case class CaseWhen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with CodegenFallback with 
Serializable {
-
-  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-super[CodegenFallback].doGenCode(ctx, ev)
-  }
-
-  def toCodegen(): CaseWhenCodegen = {
-CaseWhenCodegen(branches, elseValue)
-  }
-}
-
-/**
- * CaseWhen expression used when code generation condition is satisfied.
- * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-case class CaseWhenCodegen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with Serializable {
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-// Generate code that looks like:
-//
-// condA = ...
-// if (condA) {
-//   valueA
-// } else {
-//   condB = ...
-//   if (condB) {
-// valueB
-//   } else {
-// condC = ...
-// if (condC) {
-//   valueC
-// } else {
-//   elseValue
-// }
-//   }
-// }
+// This variable represents whether the first successful condition is 
met or not.
+// It is initialized to `false` and it is set to `true` when the first 
condition which
+// evaluates to `true` is met and therefore is not needed to go on 
anymore on the computation
+// of the following conditions.
+val conditionMet = ctx.freshName("caseWhenConditionMet")
+ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull, "")
--- End diff --

thanks, I branched from a version when there was no default value. I merged 
and fixed it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...

2017-11-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19752#discussion_r153118387
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -211,111 +231,73 @@ abstract class CaseWhenBase(
 val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("")
 "CASE" + cases + elseCase + " END"
   }
-}
-
-
-/**
- * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE 
e] END".
- * When a = true, returns b; when c = true, returns d; else returns e.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE 
expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, 
returns `expr4`; else returns `expr5`.",
-  arguments = """
-Arguments:
-  * expr1, expr3 - the branch condition expressions should all be 
boolean type.
-  * expr2, expr4, expr5 - the branch value expressions and else value 
expression should all be
-  same type or coercible to a common type.
-  """,
-  examples = """
-Examples:
-  > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   1
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   2
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END;
-   NULL
-  """)
-// scalastyle:on line.size.limit
-case class CaseWhen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with CodegenFallback with 
Serializable {
-
-  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-super[CodegenFallback].doGenCode(ctx, ev)
-  }
-
-  def toCodegen(): CaseWhenCodegen = {
-CaseWhenCodegen(branches, elseValue)
-  }
-}
-
-/**
- * CaseWhen expression used when code generation condition is satisfied.
- * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-case class CaseWhenCodegen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with Serializable {
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-// Generate code that looks like:
-//
-// condA = ...
-// if (condA) {
-//   valueA
-// } else {
-//   condB = ...
-//   if (condB) {
-// valueB
-//   } else {
-// condC = ...
-// if (condC) {
-//   valueC
-// } else {
-//   elseValue
-// }
-//   }
-// }
+// This variable represents whether the first successful condition is 
met or not.
+// It is initialized to `false` and it is set to `true` when the first 
condition which
+// evaluates to `true` is met and therefore is not needed to go on 
anymore on the computation
+// of the following conditions.
+val conditionMet = ctx.freshName("caseWhenConditionMet")
+ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull, "")
+ctx.addMutableState(ctx.javaType(dataType), ev.value, "")
 val cases = branches.map { case (condExpr, valueExpr) =>
   val cond = condExpr.genCode(ctx)
   val res = valueExpr.genCode(ctx)
   s"""
-${cond.code}
-if (!${cond.isNull} && ${cond.value}) {
-  ${res.code}
-  ${ev.isNull} = ${res.isNull};
-  ${ev.value} = ${res.value};
+if(!$conditionMet) {
+  ${cond.code}
+  if (!${cond.isNull} && ${cond.value}) {
+${res.code}
+${ev.isNull} = ${res.isNull};
+${ev.value} = ${res.value};
+$conditionMet = true;
+  }
 }
   """
 }
 
-var generatedCode = cases.mkString("", "\nelse {\n", "\nelse {\n")
-
-elseValue.foreach { elseExpr =>
+val elseCode = elseValue.map { elseExpr =>
   val res = elseExpr.genCode(ctx)
-  generatedCode +=
-s"""
+  s"""
+if(!$conditionMet) {
   ${res.code}
   ${ev.isNull} = ${res.isNull};
   ${ev.value} = ${res.value};
-"""
+}
+  """
 }
 
-generatedCode += "}\n" * cases.size
+val 

[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...

2017-11-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19752#discussion_r153118326
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -211,111 +231,73 @@ abstract class CaseWhenBase(
 val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("")
 "CASE" + cases + elseCase + " END"
   }
-}
-
-
-/**
- * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE 
e] END".
- * When a = true, returns b; when c = true, returns d; else returns e.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE 
expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, 
returns `expr4`; else returns `expr5`.",
-  arguments = """
-Arguments:
-  * expr1, expr3 - the branch condition expressions should all be 
boolean type.
-  * expr2, expr4, expr5 - the branch value expressions and else value 
expression should all be
-  same type or coercible to a common type.
-  """,
-  examples = """
-Examples:
-  > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   1
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   2
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END;
-   NULL
-  """)
-// scalastyle:on line.size.limit
-case class CaseWhen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with CodegenFallback with 
Serializable {
-
-  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-super[CodegenFallback].doGenCode(ctx, ev)
-  }
-
-  def toCodegen(): CaseWhenCodegen = {
-CaseWhenCodegen(branches, elseValue)
-  }
-}
-
-/**
- * CaseWhen expression used when code generation condition is satisfied.
- * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-case class CaseWhenCodegen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with Serializable {
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-// Generate code that looks like:
--- End diff --

I don't think it is necessary since now the generated code is way easier 
and more standard and nowhere else a comment like this is provided. Anyway, if 
you feel it is needed, I can add it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2017-11-26 Thread gczsjdy
Github user gczsjdy commented on the issue:

https://github.com/apache/spark/pull/19788
  
What are ` external shuffle service` here? Can you explain a little bit?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-26 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r153117548
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala ---
@@ -116,8 +117,8 @@ object BlockId {
   def apply(name: String): BlockId = name match {
 case RDD(rddId, splitIndex) =>
   RDDBlockId(rddId.toInt, splitIndex.toInt)
-case SHUFFLE(shuffleId, mapId, reduceId) =>
-  ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
+case SHUFFLE(shuffleId, mapId, reduceId, n) =>
--- End diff --

Yes, good catch! I will change here after using `ContinuousShuffleBlockId`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-26 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r153117088
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala ---
@@ -116,8 +117,8 @@ object BlockId {
   def apply(name: String): BlockId = name match {
 case RDD(rddId, splitIndex) =>
   RDDBlockId(rddId.toInt, splitIndex.toInt)
-case SHUFFLE(shuffleId, mapId, reduceId) =>
-  ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
+case SHUFFLE(shuffleId, mapId, reduceId, n) =>
--- End diff --

:nit length?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11215: [SPARK-10969] [Streaming] [Kinesis] Allow specify...

2017-11-26 Thread kaklakariada
Github user kaklakariada closed the pull request at:

https://github.com/apache/spark/pull/11215


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11215: [SPARK-10969] [Streaming] [Kinesis] Allow specifying sep...

2017-11-26 Thread kaklakariada
Github user kaklakariada commented on the issue:

https://github.com/apache/spark/pull/11215
  
Solved with https://issues.apache.org/jira/browse/SPARK-19911 / #17250, see 
[this 
comment](https://issues.apache.org/jira/browse/SPARK-10969?focusedCommentId=16266374=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16266374).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19819: [SPARK-22606][Streaming]Add threadId to the CachedKafkaC...

2017-11-26 Thread lvdongr
Github user lvdongr commented on the issue:

https://github.com/apache/spark/pull/19819
  
Will the cached consumer to the same partition increase , when different 
tasks  consume the same partition and no place to remove?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19792: [SPARK-22566][PYTHON] Better error message for `_...

2017-11-26 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19792#discussion_r15398
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1108,19 +1109,23 @@ def _has_nulltype(dt):
 return isinstance(dt, NullType)
 
 
-def _merge_type(a, b):
+def _merge_type(a, b, path=''):
--- End diff --

Should we have the default path name for the case we don't have names?
Otherwise, the error message would be like `TypeError: .arrayElement: Can 
not merge type  and `. WDYT? @HyukjinKwon 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19792: [SPARK-22566][PYTHON] Better error message for `_...

2017-11-26 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19792#discussion_r153110594
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -1722,6 +1723,83 @@ def test_infer_long_type(self):
 self.assertEqual(_infer_type(2**61), LongType())
 self.assertEqual(_infer_type(2**71), LongType())
 
+def test_merge_type(self):
+self.assertEqual(_merge_type(LongType(), NullType()), LongType())
+self.assertEqual(_merge_type(NullType(), LongType()), LongType())
+
+self.assertEqual(_merge_type(LongType(), LongType()), LongType())
+
+self.assertEqual(_merge_type(
+ArrayType(LongType()),
+ArrayType(LongType())
+), ArrayType(LongType()))
+with self.assertRaisesRegexp(TypeError, 'arrayElement'):
+_merge_type(ArrayType(LongType()), ArrayType(DoubleType()))
+
+self.assertEqual(_merge_type(
+MapType(StringType(), LongType()),
+MapType(StringType(), LongType())
+), MapType(StringType(), LongType()))
+with self.assertRaisesRegexp(TypeError, 'mapKey'):
+_merge_type(
+MapType(StringType(), LongType()),
+MapType(DoubleType(), LongType()))
+with self.assertRaisesRegexp(TypeError, 'mapValue'):
+_merge_type(
+MapType(StringType(), LongType()),
+MapType(StringType(), DoubleType()))
+
+self.assertEqual(_merge_type(
+StructType([StructField("f1", LongType()), StructField("f2", 
StringType())]),
+StructType([StructField("f1", LongType()), StructField("f2", 
StringType())])
+), StructType([StructField("f1", LongType()), StructField("f2", 
StringType())]))
+with self.assertRaisesRegexp(TypeError, r'structField\("f1"\)'):
--- End diff --

nit: We don't need `r` prefix for each regex for `assertRaisesRegexp`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19815: [SPARK-22602][SQL] remove ColumnVector#loadBytes

2017-11-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19815


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19815: [SPARK-22602][SQL] remove ColumnVector#loadBytes

2017-11-26 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19815
  
Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-26 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153110760
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
---
@@ -195,18 +196,26 @@ private[spark] class MetricsSystem private (
   val classPath = kv._2.getProperty("class")
   if (null != classPath) {
 try {
-  val sink = Utils.classForName(classPath)
-.getConstructor(classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
-.newInstance(kv._2, registry, securityMgr)
+  val sink = Utils.classForName(classPath).getConstructor(
+classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
+  .newInstance(kv._2, registry, securityMgr)
   if (kv._1 == "servlet") {
 metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
   } else {
 sinks += sink.asInstanceOf[Sink]
   }
 } catch {
-  case e: Exception =>
-logError("Sink class " + classPath + " cannot be instantiated")
-throw e
+  case _: NoSuchMethodException =>
+try {
+  sinks += Utils.classForName(classPath)
--- End diff --

No, not necessary, `MetricsServlet` is a built-in metrics sink which will 
be explicitly added in the above code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19815: [SPARK-22602][SQL] remove ColumnVector#loadBytes

2017-11-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19815
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19815: [SPARK-22602][SQL] remove ColumnVector#loadBytes

2017-11-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19815
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84203/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19815: [SPARK-22602][SQL] remove ColumnVector#loadBytes

2017-11-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19815
  
**[Test build #84203 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84203/testReport)**
 for PR 19815 at commit 
[`5711bb2`](https://github.com/apache/spark/commit/5711bb20e9d598c8716d7d8636466ee82e623a20).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2017-11-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/11994
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84202/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2017-11-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/11994
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2017-11-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/11994
  
**[Test build #84202 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84202/testReport)**
 for PR 11994 at commit 
[`f360dac`](https://github.com/apache/spark/commit/f360dac77499d7d0eebc09528255922c9315fb31).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-11-26 Thread sriramrajendiran
Github user sriramrajendiran commented on the issue:

https://github.com/apache/spark/pull/16578
  
@felixcheung can you help ? we are hoping to see it in 2.3 release. Feature 
underneath a default disabled flag looks safe option.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp...

2017-11-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19607
  
**[Test build #84205 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84205/testReport)**
 for PR 19607 at commit 
[`40a9735`](https://github.com/apache/spark/commit/40a9735b88deb85c08f618186daf9ed2152fc406).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp...

2017-11-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19607
  
**[Test build #84204 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84204/testReport)**
 for PR 19607 at commit 
[`f92eae3`](https://github.com/apache/spark/commit/f92eae35767a766ad80ac576a67f521e365549c7).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-26 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153107748
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -997,6 +997,14 @@ object SQLConf {
   .intConf
   .createWithDefault(1)
 
+  val PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE =
+buildConf("spark.sql.execution.pandas.respectSessionTimeZone")
+  .internal()
+  .doc("When true, make Pandas DataFrame with timestamp type 
respecting session local " +
+"timezone when converting to/from Pandas DataFrame.")
--- End diff --

Sure, I'll update it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-26 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153107765
  
--- Diff: python/setup.py ---
@@ -201,7 +201,7 @@ def _supports_symlinks():
 extras_require={
 'ml': ['numpy>=1.7'],
 'mllib': ['numpy>=1.7'],
-'sql': ['pandas>=0.13.0']
+'sql': ['pandas>=0.19.2']
--- End diff --

Sure, I'll add it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19815: [SPARK-22602][SQL] remove ColumnVector#loadBytes

2017-11-26 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19815
  
LGTM pending Jenkins.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2017-11-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/11994
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2017-11-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/11994
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84201/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2017-11-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/11994
  
**[Test build #84201 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84201/testReport)**
 for PR 11994 at commit 
[`38bc2a4`](https://github.com/apache/spark/commit/38bc2a43a69ffb0b40dc5ebb0a60568225eeb798).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...

2017-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19752#discussion_r153101065
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -211,111 +231,73 @@ abstract class CaseWhenBase(
 val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("")
 "CASE" + cases + elseCase + " END"
   }
-}
-
-
-/**
- * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE 
e] END".
- * When a = true, returns b; when c = true, returns d; else returns e.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE 
expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, 
returns `expr4`; else returns `expr5`.",
-  arguments = """
-Arguments:
-  * expr1, expr3 - the branch condition expressions should all be 
boolean type.
-  * expr2, expr4, expr5 - the branch value expressions and else value 
expression should all be
-  same type or coercible to a common type.
-  """,
-  examples = """
-Examples:
-  > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   1
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   2
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END;
-   NULL
-  """)
-// scalastyle:on line.size.limit
-case class CaseWhen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with CodegenFallback with 
Serializable {
-
-  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-super[CodegenFallback].doGenCode(ctx, ev)
-  }
-
-  def toCodegen(): CaseWhenCodegen = {
-CaseWhenCodegen(branches, elseValue)
-  }
-}
-
-/**
- * CaseWhen expression used when code generation condition is satisfied.
- * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-case class CaseWhenCodegen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with Serializable {
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-// Generate code that looks like:
--- End diff --

shall we keep this comment and update it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...

2017-11-26 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19752
  
LGTM except a few minor comments


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...

2017-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19752#discussion_r153100483
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -211,111 +231,73 @@ abstract class CaseWhenBase(
 val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("")
 "CASE" + cases + elseCase + " END"
   }
-}
-
-
-/**
- * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE 
e] END".
- * When a = true, returns b; when c = true, returns d; else returns e.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE 
expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, 
returns `expr4`; else returns `expr5`.",
-  arguments = """
-Arguments:
-  * expr1, expr3 - the branch condition expressions should all be 
boolean type.
-  * expr2, expr4, expr5 - the branch value expressions and else value 
expression should all be
-  same type or coercible to a common type.
-  """,
-  examples = """
-Examples:
-  > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   1
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   2
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END;
-   NULL
-  """)
-// scalastyle:on line.size.limit
-case class CaseWhen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with CodegenFallback with 
Serializable {
-
-  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-super[CodegenFallback].doGenCode(ctx, ev)
-  }
-
-  def toCodegen(): CaseWhenCodegen = {
-CaseWhenCodegen(branches, elseValue)
-  }
-}
-
-/**
- * CaseWhen expression used when code generation condition is satisfied.
- * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-case class CaseWhenCodegen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with Serializable {
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-// Generate code that looks like:
-//
-// condA = ...
-// if (condA) {
-//   valueA
-// } else {
-//   condB = ...
-//   if (condB) {
-// valueB
-//   } else {
-// condC = ...
-// if (condC) {
-//   valueC
-// } else {
-//   elseValue
-// }
-//   }
-// }
+// This variable represents whether the first successful condition is 
met or not.
+// It is initialized to `false` and it is set to `true` when the first 
condition which
+// evaluates to `true` is met and therefore is not needed to go on 
anymore on the computation
+// of the following conditions.
+val conditionMet = ctx.freshName("caseWhenConditionMet")
+ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull, "")
+ctx.addMutableState(ctx.javaType(dataType), ev.value, "")
 val cases = branches.map { case (condExpr, valueExpr) =>
   val cond = condExpr.genCode(ctx)
   val res = valueExpr.genCode(ctx)
   s"""
-${cond.code}
-if (!${cond.isNull} && ${cond.value}) {
-  ${res.code}
-  ${ev.isNull} = ${res.isNull};
-  ${ev.value} = ${res.value};
+if(!$conditionMet) {
+  ${cond.code}
+  if (!${cond.isNull} && ${cond.value}) {
+${res.code}
+${ev.isNull} = ${res.isNull};
+${ev.value} = ${res.value};
+$conditionMet = true;
+  }
 }
   """
 }
 
-var generatedCode = cases.mkString("", "\nelse {\n", "\nelse {\n")
-
-elseValue.foreach { elseExpr =>
+val elseCode = elseValue.map { elseExpr =>
   val res = elseExpr.genCode(ctx)
-  generatedCode +=
-s"""
+  s"""
+if(!$conditionMet) {
   ${res.code}
   ${ev.isNull} = ${res.isNull};
   ${ev.value} = ${res.value};
-"""
+}
+  """
 }
 
-generatedCode += "}\n" * cases.size
+val 

[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...

2017-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19752#discussion_r153100103
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -211,111 +231,73 @@ abstract class CaseWhenBase(
 val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("")
 "CASE" + cases + elseCase + " END"
   }
-}
-
-
-/**
- * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE 
e] END".
- * When a = true, returns b; when c = true, returns d; else returns e.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-// scalastyle:off line.size.limit
-@ExpressionDescription(
-  usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE 
expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, 
returns `expr4`; else returns `expr5`.",
-  arguments = """
-Arguments:
-  * expr1, expr3 - the branch condition expressions should all be 
boolean type.
-  * expr2, expr4, expr5 - the branch value expressions and else value 
expression should all be
-  same type or coercible to a common type.
-  """,
-  examples = """
-Examples:
-  > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   1
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
-   2
-  > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END;
-   NULL
-  """)
-// scalastyle:on line.size.limit
-case class CaseWhen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with CodegenFallback with 
Serializable {
-
-  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-super[CodegenFallback].doGenCode(ctx, ev)
-  }
-
-  def toCodegen(): CaseWhenCodegen = {
-CaseWhenCodegen(branches, elseValue)
-  }
-}
-
-/**
- * CaseWhen expression used when code generation condition is satisfied.
- * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen.
- *
- * @param branches seq of (branch condition, branch value)
- * @param elseValue optional value for the else branch
- */
-case class CaseWhenCodegen(
-val branches: Seq[(Expression, Expression)],
-val elseValue: Option[Expression] = None)
-  extends CaseWhenBase(branches, elseValue) with Serializable {
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-// Generate code that looks like:
-//
-// condA = ...
-// if (condA) {
-//   valueA
-// } else {
-//   condB = ...
-//   if (condB) {
-// valueB
-//   } else {
-// condC = ...
-// if (condC) {
-//   valueC
-// } else {
-//   elseValue
-// }
-//   }
-// }
+// This variable represents whether the first successful condition is 
met or not.
+// It is initialized to `false` and it is set to `true` when the first 
condition which
+// evaluates to `true` is met and therefore is not needed to go on 
anymore on the computation
+// of the following conditions.
+val conditionMet = ctx.freshName("caseWhenConditionMet")
+ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull, "")
--- End diff --

nit: `ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull)`, as empty string is 
the default value of the 3rd parameter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...

2017-11-26 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19752
  
LGTM cc @cloud-fan @kiszk 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19714: [SPARK-22489][SQL] Shouldn't change broadcast join build...

2017-11-26 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19714
  
Just for curiosity, what if users ask to broadcast both the join side in 
the hint? Shall we throw exception or pick a smaller side to broadcast 
according to stats?

BTW it's a behavior change, although it's more reasonable than before, we 
need to document it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...

2017-11-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19821#discussion_r153099135
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -785,13 +785,36 @@ class CodegenContext {
* @param expressions the codes to evaluate expressions.
*/
   def splitExpressions(row: String, expressions: Seq[String]): String = {
-if (row == null || currentVars != null) {
+if (INPUT_ROW == null || currentVars != null) {
   // Cannot split these expressions because they are not created from 
a row object.
   return expressions.mkString("\n")
 }
 splitExpressions(expressions, funcName = "apply", arguments = 
("InternalRow", row) :: Nil)
   }
 
+  /**
+   * Splits the generated code of expressions into multiple functions, 
because function has
+   * 64kb code size limit in JVM. This version takes care of INPUT_ROW and 
currentVars
+   *
+   * @param expressions the codes to evaluate expressions.
+   * @param funcName the split function name base.
+   * @param argumentsExceptRow the list of (type, name) of the arguments 
of the split function
+   *   except for ctx.INPUT_ROW
+  */
+  def splitExpressions(
+  expressions: Seq[String],
+  funcName: String,
+  argumentsExceptRow: Seq[(String, String)]): String = {
--- End diff --

Could we combine them in the same function?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...

2017-11-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19821#discussion_r153098738
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -785,13 +785,36 @@ class CodegenContext {
* @param expressions the codes to evaluate expressions.
*/
   def splitExpressions(row: String, expressions: Seq[String]): String = {
-if (row == null || currentVars != null) {
+if (INPUT_ROW == null || currentVars != null) {
   // Cannot split these expressions because they are not created from 
a row object.
   return expressions.mkString("\n")
 }
 splitExpressions(expressions, funcName = "apply", arguments = 
("InternalRow", row) :: Nil)
   }
 
+  /**
+   * Splits the generated code of expressions into multiple functions, 
because function has
+   * 64kb code size limit in JVM. This version takes care of INPUT_ROW and 
currentVars
+   *
+   * @param expressions the codes to evaluate expressions.
+   * @param funcName the split function name base.
+   * @param argumentsExceptRow the list of (type, name) of the arguments 
of the split function
+   *   except for ctx.INPUT_ROW
+  */
+  def splitExpressions(
+  expressions: Seq[String],
+  funcName: String,
+  argumentsExceptRow: Seq[(String, String)]): String = {
+if (INPUT_ROW == null || currentVars != null) {
+  // Cannot split these expressions because they are not created from 
a row object.
+  return expressions.mkString("\n")
--- End diff --

If-else looks better.  Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153098194
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala ---
@@ -25,27 +25,29 @@ import com.codahale.metrics.{ConsoleReporter, 
MetricRegistry}
 import org.apache.spark.SecurityManager
 import org.apache.spark.metrics.MetricsSystem
 
-private[spark] class ConsoleSink(val property: Properties, val registry: 
MetricRegistry,
-securityMgr: SecurityManager) extends Sink {
+private[spark] class ConsoleSink(
+property: Properties,
+registry: MetricRegistry,
+securityMgr: SecurityManager) extends Sink(property, registry) {
   val CONSOLE_DEFAULT_PERIOD = 10
   val CONSOLE_DEFAULT_UNIT = "SECONDS"
 
   val CONSOLE_KEY_PERIOD = "period"
   val CONSOLE_KEY_UNIT = "unit"
 
-  val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
+  private val pollPeriod = 
Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
--- End diff --

Emmm... These logics seems quite similar among different Sinks, could we 
abstract these logic in `Sink` class, or generate a trait for them?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153097744
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
---
@@ -195,18 +196,26 @@ private[spark] class MetricsSystem private (
   val classPath = kv._2.getProperty("class")
   if (null != classPath) {
 try {
-  val sink = Utils.classForName(classPath)
-.getConstructor(classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
-.newInstance(kv._2, registry, securityMgr)
+  val sink = Utils.classForName(classPath).getConstructor(
+classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
+  .newInstance(kv._2, registry, securityMgr)
   if (kv._1 == "servlet") {
 metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
   } else {
 sinks += sink.asInstanceOf[Sink]
   }
 } catch {
-  case e: Exception =>
-logError("Sink class " + classPath + " cannot be instantiated")
-throw e
+  case _: NoSuchMethodException =>
+try {
+  sinks += Utils.classForName(classPath)
--- End diff --

Do we have to handle the case when `kv._1 == "servlet"` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153097850
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala ---
@@ -25,27 +25,29 @@ import com.codahale.metrics.{ConsoleReporter, 
MetricRegistry}
 import org.apache.spark.SecurityManager
 import org.apache.spark.metrics.MetricsSystem
 
-private[spark] class ConsoleSink(val property: Properties, val registry: 
MetricRegistry,
-securityMgr: SecurityManager) extends Sink {
+private[spark] class ConsoleSink(
--- End diff --

nit: add comment for each param here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153097471
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
---
@@ -195,18 +196,26 @@ private[spark] class MetricsSystem private (
   val classPath = kv._2.getProperty("class")
   if (null != classPath) {
 try {
-  val sink = Utils.classForName(classPath)
-.getConstructor(classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
-.newInstance(kv._2, registry, securityMgr)
+  val sink = Utils.classForName(classPath).getConstructor(
+classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
--- End diff --

Why make this format change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153098545
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala ---
@@ -17,8 +17,37 @@
 
 package org.apache.spark.metrics.sink
 
-private[spark] trait Sink {
+import java.util.Properties
+
+import com.codahale.metrics.MetricRegistry
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * The abstract class of metrics Sink, by achiving the methods and 
registered through metrics
+ * .properties user could register customer metrics Sink into 
MetricsSystem.
+ *
+ * @param properties Properties related this specific Sink, properties are 
read from
+ *   configuration file, user could define their own 
configurations and get
+ *   from this parameter.
+ * @param metricRegistry The MetricRegistry for you to dump the collected 
metrics.
+ */
+@DeveloperApi
+abstract class Sink(properties: Properties, metricRegistry: 
MetricRegistry) {
+
+  /**
+   * Start this metrics Sink, this will be called by MetricsSystem
--- End diff --

Do we have to define the behavior when start failed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19813: [WIP][SPARK-22600][SQL] Fix 64kb limit for deeply nested...

2017-11-26 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19813
  
If we have a clear rule, I think it makes more sense to do this in 
`CodegenContext`, i.e. having a `def splitExpressions(expressions: 
Seq[String]): String`, which automatically extract the current inputs and put 
them into the parameter list.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19714: [SPARK-22489][SQL] Shouldn't change broadcast join build...

2017-11-26 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19714
  
We also need to update the comment of `JoinSelection`.

cc @liufengdb @cloud-fan @rxin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19815: [SPARK-22602][SQL] remove ColumnVector#loadBytes

2017-11-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19815
  
**[Test build #84203 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84203/testReport)**
 for PR 19815 at commit 
[`5711bb2`](https://github.com/apache/spark/commit/5711bb20e9d598c8716d7d8636466ee82e623a20).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153098067
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 ---
@@ -223,4 +223,36 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
 assert(HashJoin.rewriteKeyExpr(l :: ss :: Nil) === l :: ss :: Nil)
 assert(HashJoin.rewriteKeyExpr(i :: ss :: Nil) === i :: ss :: Nil)
   }
+
+  test("Shouldn't change broadcast join buildSide if user clearly 
specified") {
+spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", 
"value").createTempView("table1")
+spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", 
"value").createTempView("table2")
+
+def assertJoinBuildSide(pair: (String, BuildSide)): Any = {
+  val (sqlString, s) = pair
+  val df = sql(sqlString)
+  val physical = df.queryExecution.executedPlan
+  physical match {
--- End diff --

Instead of doing `match`, can you just try to call `collect` and  assert 
the result is 1?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19818: [SPARK-22604][SQL] remove the get address methods...

2017-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19818#discussion_r153096313
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
 ---
@@ -62,13 +62,6 @@
*/
   public abstract boolean anyNullsSet();
 
-  /**
-   * Returns the off heap ptr for the arrays backing the NULLs and values 
buffer. Only valid
-   * to call for off heap columns.
--- End diff --

good idea


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19818: [SPARK-22604][SQL] remove the get address methods...

2017-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19818#discussion_r153096296
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 ---
@@ -73,12 +75,12 @@ public OffHeapColumnVector(int capacity, DataType type) 
{
 reset();
   }
 
-  @Override
+  @VisibleForTesting
   public long valuesNativeAddress() {
 return data;
   }
 
-  @Override
+  @VisibleForTesting
--- End diff --

they are called in benchmark


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...

2017-11-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19821#discussion_r153095567
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -785,13 +785,36 @@ class CodegenContext {
* @param expressions the codes to evaluate expressions.
*/
   def splitExpressions(row: String, expressions: Seq[String]): String = {
-if (row == null || currentVars != null) {
+if (INPUT_ROW == null || currentVars != null) {
   // Cannot split these expressions because they are not created from 
a row object.
   return expressions.mkString("\n")
 }
 splitExpressions(expressions, funcName = "apply", arguments = 
("InternalRow", row) :: Nil)
   }
 
+  /**
+   * Splits the generated code of expressions into multiple functions, 
because function has
+   * 64kb code size limit in JVM. This version takes care of INPUT_ROW and 
currentVars
+   *
+   * @param expressions the codes to evaluate expressions.
+   * @param funcName the split function name base.
+   * @param argumentsExceptRow the list of (type, name) of the arguments 
of the split function
+   *   except for ctx.INPUT_ROW
+  */
+  def splitExpressions(
+  expressions: Seq[String],
+  funcName: String,
+  argumentsExceptRow: Seq[(String, String)]): String = {
--- End diff --

I agree that it is good from the view of consistency. I have one question 
in my mind.

If we use the same argument name `arguments`, is it possible to for 
developer to distinguish this `splitExpressions` from the below (rich) 
`splitExpressions` when they want to pass only three arguments `expressions`, 
`funcName`, and `arguments`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2017-11-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/11994
  
**[Test build #84202 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84202/testReport)**
 for PR 11994 at commit 
[`f360dac`](https://github.com/apache/spark/commit/f360dac77499d7d0eebc09528255922c9315fb31).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19814: [SPARK-22484][DOC] Document PySpark DataFrame csv writer...

2017-11-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19814
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84200/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19814: [SPARK-22484][DOC] Document PySpark DataFrame csv writer...

2017-11-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19814
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19814: [SPARK-22484][DOC] Document PySpark DataFrame csv writer...

2017-11-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19814
  
**[Test build #84200 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84200/testReport)**
 for PR 19814 at commit 
[`821ee19`](https://github.com/apache/spark/commit/821ee1993e26289bd5cc93be14543a8dbe913ff8).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...

2017-11-26 Thread caneGuy
Github user caneGuy commented on the issue:

https://github.com/apache/spark/pull/19764
  
Ping any admin help review this?Thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2017-11-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/11994
  
**[Test build #84201 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84201/testReport)**
 for PR 11994 at commit 
[`38bc2a4`](https://github.com/apache/spark/commit/38bc2a43a69ffb0b40dc5ebb0a60568225eeb798).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17520: [WIP][SPARK-19712][SQL] Move PullupCorrelatedPredicates ...

2017-11-26 Thread wangyum
Github user wangyum commented on the issue:

https://github.com/apache/spark/pull/17520
  
@nsyca Can you resolve conflicts?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19717: [SPARK-18278] [Submission] Spark on Kubernetes - basic s...

2017-11-26 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/19717
  
+CC @srowen 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...

2017-11-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19821#discussion_r153092301
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -785,13 +785,36 @@ class CodegenContext {
* @param expressions the codes to evaluate expressions.
*/
   def splitExpressions(row: String, expressions: Seq[String]): String = {
-if (row == null || currentVars != null) {
+if (INPUT_ROW == null || currentVars != null) {
--- End diff --

good catch, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...

2017-11-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19821#discussion_r153092158
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -785,13 +785,36 @@ class CodegenContext {
* @param expressions the codes to evaluate expressions.
*/
   def splitExpressions(row: String, expressions: Seq[String]): String = {
-if (row == null || currentVars != null) {
+if (INPUT_ROW == null || currentVars != null) {
   // Cannot split these expressions because they are not created from 
a row object.
   return expressions.mkString("\n")
 }
 splitExpressions(expressions, funcName = "apply", arguments = 
("InternalRow", row) :: Nil)
   }
 
+  /**
+   * Splits the generated code of expressions into multiple functions, 
because function has
+   * 64kb code size limit in JVM. This version takes care of INPUT_ROW and 
currentVars
+   *
+   * @param expressions the codes to evaluate expressions.
+   * @param funcName the split function name base.
+   * @param argumentsExceptRow the list of (type, name) of the arguments 
of the split function
+   *   except for ctx.INPUT_ROW
+  */
+  def splitExpressions(
+  expressions: Seq[String],
+  funcName: String,
+  argumentsExceptRow: Seq[(String, String)]): String = {
+if (INPUT_ROW == null || currentVars != null) {
+  // Cannot split these expressions because they are not created from 
a row object.
+  return expressions.mkString("\n")
--- End diff --

I followed to use `return` like the above `splitExpressions`. Is it better 
for this place to write as follows?
`if () {
  expressions.mkString("\n")
} else {
  splitExpressions(...)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19823: [SPARK-22601][SQL] Data load is getting displayed...

2017-11-26 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19823#discussion_r153091293
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
@@ -341,6 +341,12 @@ case class LoadDataCommand(
   } else {
 val uri = new URI(path)
 if (uri.getScheme() != null && uri.getAuthority() != null) {
+  val hadoopConf = sparkSession.sessionState.newHadoopConf()
+  val srcPath = new Path(path)
--- End diff --

nit: Let's use `new Path(uri)`. I think we better validate `uri` in this 
scope.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153087493
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 ---
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.Utils
+
+/**
+ * A factory class for configuring and creating executor pods.
+ */
+private[spark] trait ExecutorPodFactory {
+
+  /**
+   * Configure and construct an executor pod with the given parameters.
+   */
+  def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod
+}
+
+private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
+  extends ExecutorPodFactory {
+
+  import ExecutorPodFactoryImpl._
+
+  private val executorExtraClasspath =
+sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+
+  private val executorLabels = 
ConfigurationUtils.parsePrefixedKeyValuePairs(
+sparkConf,
+KUBERNETES_EXECUTOR_LABEL_PREFIX)
+  require(
+!executorLabels.contains(SPARK_APP_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
+  require(
+!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
+  " Spark.")
+  require(
+!executorLabels.contains(SPARK_ROLE_LABEL),
+s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is 
reserved for Spark.")
+
+  private val executorAnnotations =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
+  private val nodeSelector =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
+  private val dockerImagePullPolicy = 
sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+  private val executorPort = sparkConf.getInt("spark.executor.port", 
DEFAULT_STATIC_PORT)
+  private val blockManagerPort = sparkConf
+.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+
+  private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+
+  private val executorMemoryMiB = 
sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
+  private val executorMemoryString = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
+org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = sparkConf
+.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
+
+  private val executorCores = sparkConf.getDouble("spark.executor.cores", 
1)
+  private val executorLimitCores = 
sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
+
+  override def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod = {
+val name = 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153088979
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.File
+
+import io.fabric8.kubernetes.client.Config
+
+import org.apache.spark.SparkContext
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.{ExternalClusterManager, 
SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class KubernetesClusterManager extends 
ExternalClusterManager with Logging {
+
+  override def canCreate(masterURL: String): Boolean = 
masterURL.startsWith("k8s")
+
+  override def createTaskScheduler(sc: SparkContext, masterURL: String): 
TaskScheduler = {
+new TaskSchedulerImpl(sc)
+  }
+
+  override def createSchedulerBackend(
+  sc: SparkContext,
+  masterURL: String,
+  scheduler: TaskScheduler): SchedulerBackend = {
+val sparkConf = sc.getConf
+
+val kubernetesClient = 
SparkKubernetesClientFactory.createKubernetesClient(
+  KUBERNETES_MASTER_INTERNAL_URL,
+  Some(sparkConf.get(KUBERNETES_NAMESPACE)),
+  APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
+  sparkConf,
+  Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
+  Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
+
+val executorPodFactory = new ExecutorPodFactoryImpl(sparkConf)
+val allocatorExecutor = ThreadUtils
+  .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
+val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
+  "kubernetes-executor-requests")
--- End diff --

Why is this service passed from outside ?
It is an impl detail of `KubernetesClusterSchedulerBackend` and should be 
initialized/managed within (create in constructor, shutdown in stop)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153088232
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 ---
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.Utils
+
+/**
+ * A factory class for configuring and creating executor pods.
+ */
+private[spark] trait ExecutorPodFactory {
+
+  /**
+   * Configure and construct an executor pod with the given parameters.
+   */
+  def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod
+}
+
+private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
+  extends ExecutorPodFactory {
+
+  import ExecutorPodFactoryImpl._
+
+  private val executorExtraClasspath =
+sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+
+  private val executorLabels = 
ConfigurationUtils.parsePrefixedKeyValuePairs(
+sparkConf,
+KUBERNETES_EXECUTOR_LABEL_PREFIX)
+  require(
+!executorLabels.contains(SPARK_APP_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
+  require(
+!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
+  " Spark.")
+  require(
+!executorLabels.contains(SPARK_ROLE_LABEL),
+s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is 
reserved for Spark.")
+
+  private val executorAnnotations =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
+  private val nodeSelector =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
+  private val dockerImagePullPolicy = 
sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+  private val executorPort = sparkConf.getInt("spark.executor.port", 
DEFAULT_STATIC_PORT)
--- End diff --

Is this required ? IIRC as of 2.0 the config was removed/is not longer used.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153091143
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+import javax.annotation.concurrent.GuardedBy
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
SchedulerBackendUtils}
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK")
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+requestExecutorsService)
+
+  private val driverPod = kubernetesClient.pods()
+.inNamespace(kubernetesNamespace)
+.withName(kubernetesDriverPodName)
+.get()
+
+  protected override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+conf.get("spark.driver.host"),
+conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = 
SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
+
+  private val podAllocationInterval = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val allocatorRunnable = new Runnable {
+
+// Maintains a map of executor id to count of checks performed to 
learn the loss reason
+// for an executor.
+private val executorReasonCheckAttemptCounts 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153084210
  
--- Diff: resource-managers/kubernetes/core/pom.xml ---
@@ -0,0 +1,94 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+  4.0.0
+  
+org.apache.spark
+spark-parent_2.11
+2.3.0-SNAPSHOT
+../../../pom.xml
+  
+
+  spark-kubernetes_2.11
+  jar
+  Spark Project Kubernetes
+  
+kubernetes
+3.0.0
--- End diff --

Last time I asked about the client version, there were concerns regarding 
maturity/stability of 3.x compared to the 2.2 (iirc) version which was in use - 
where they resolved ?
I, obviously, prefer moving to the latest major version; but want to 
understand the risks involved - if any.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153087234
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 ---
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.Utils
+
+/**
+ * A factory class for configuring and creating executor pods.
+ */
+private[spark] trait ExecutorPodFactory {
+
+  /**
+   * Configure and construct an executor pod with the given parameters.
+   */
+  def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod
+}
+
+private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
+  extends ExecutorPodFactory {
+
+  import ExecutorPodFactoryImpl._
+
+  private val executorExtraClasspath =
+sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+
+  private val executorLabels = 
ConfigurationUtils.parsePrefixedKeyValuePairs(
+sparkConf,
+KUBERNETES_EXECUTOR_LABEL_PREFIX)
+  require(
+!executorLabels.contains(SPARK_APP_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
+  require(
+!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
+  " Spark.")
+  require(
+!executorLabels.contains(SPARK_ROLE_LABEL),
+s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is 
reserved for Spark.")
+
+  private val executorAnnotations =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
+  private val nodeSelector =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
+  private val dockerImagePullPolicy = 
sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+  private val executorPort = sparkConf.getInt("spark.executor.port", 
DEFAULT_STATIC_PORT)
+  private val blockManagerPort = sparkConf
+.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+
+  private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+
+  private val executorMemoryMiB = 
sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
+  private val executorMemoryString = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
+org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = sparkConf
+.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
+
+  private val executorCores = sparkConf.getDouble("spark.executor.cores", 
1)
+  private val executorLimitCores = 
sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
+
+  override def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod = {
+val name = 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153089366
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+import javax.annotation.concurrent.GuardedBy
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
SchedulerBackendUtils}
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK")
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+requestExecutorsService)
+
+  private val driverPod = kubernetesClient.pods()
+.inNamespace(kubernetesNamespace)
+.withName(kubernetesDriverPodName)
+.get()
+
+  protected override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+conf.get("spark.driver.host"),
+conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = 
SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
+
+  private val podAllocationInterval = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val allocatorRunnable = new Runnable {
+
+// Maintains a map of executor id to count of checks performed to 
learn the loss reason
+// for an executor.
+private val executorReasonCheckAttemptCounts 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153089805
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+import javax.annotation.concurrent.GuardedBy
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
SchedulerBackendUtils}
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK")
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+requestExecutorsService)
+
+  private val driverPod = kubernetesClient.pods()
+.inNamespace(kubernetesNamespace)
+.withName(kubernetesDriverPodName)
+.get()
+
+  protected override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+conf.get("spark.driver.host"),
+conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = 
SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
+
+  private val podAllocationInterval = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val allocatorRunnable = new Runnable {
+
+// Maintains a map of executor id to count of checks performed to 
learn the loss reason
+// for an executor.
+private val executorReasonCheckAttemptCounts 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153089664
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+import javax.annotation.concurrent.GuardedBy
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
SchedulerBackendUtils}
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK")
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+requestExecutorsService)
+
+  private val driverPod = kubernetesClient.pods()
+.inNamespace(kubernetesNamespace)
+.withName(kubernetesDriverPodName)
+.get()
+
+  protected override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+conf.get("spark.driver.host"),
+conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = 
SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
+
+  private val podAllocationInterval = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val allocatorRunnable = new Runnable {
+
+// Maintains a map of executor id to count of checks performed to 
learn the loss reason
+// for an executor.
+private val executorReasonCheckAttemptCounts 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153087550
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.util.Utils
+
+/**
+ * Configures executor pods. Construct one of these with a SparkConf to 
set up properties that are
+ * common across all executors. Then, pass in dynamic parameters into 
createExecutorPod.
+ */
+private[spark] trait ExecutorPodFactory {
+  def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod
+}
+
+private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
+  extends ExecutorPodFactory {
+
+  import ExecutorPodFactoryImpl._
+
+  private val executorExtraClasspath = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+  private val executorJarsDownloadDir = 
sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
+
+  private val executorLabels = 
ConfigurationUtils.parsePrefixedKeyValuePairs(
+sparkConf,
+KUBERNETES_EXECUTOR_LABEL_PREFIX,
+"executor label")
+  require(
+!executorLabels.contains(SPARK_APP_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
+  require(
+!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
+  s" Spark.")
+
+  private val executorAnnotations =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
+  "executor annotation")
+  private val nodeSelector =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_NODE_SELECTOR_PREFIX,
+  "node selector")
+
+  private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
+  private val dockerImagePullPolicy = 
sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+  private val executorPort = sparkConf.getInt("spark.executor.port", 
DEFAULT_STATIC_PORT)
+  private val blockmanagerPort = sparkConf
+.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+  private val kubernetesDriverPodName = sparkConf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+
+  private val executorMemoryMiB = 
sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
+  private val executorMemoryString = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
+org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = sparkConf
+.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
+
+  private val executorCores = sparkConf.getDouble("spark.executor.cores", 
1d)
+  private val executorLimitCores = 
sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)
+
+  override def createExecutorPod(
+  executorId: String,

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153084631
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
+" spark-submit in cluster mode, this can also be passed to 
spark-submit via the" +
+" --kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag" +
+" format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$SPARK_VERSION")
--- End diff --

Should this default `spark-executor:$SPARK_VERSION` be present ? I would 
expect the image name to be always explicitly specified by admin (in 
spark-defaults) or overridden by user.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153089121
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+import javax.annotation.concurrent.GuardedBy
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
SchedulerBackendUtils}
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK")
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+requestExecutorsService)
+
+  private val driverPod = kubernetesClient.pods()
+.inNamespace(kubernetesNamespace)
+.withName(kubernetesDriverPodName)
+.get()
+
+  protected override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
--- End diff --

We need to update docs/configuration.md to specify this is used by both 
yarn and kubernetes now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153084513
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
+" spark-submit in cluster mode, this can also be passed to 
spark-submit via the" +
+" --kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag" +
+" format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$SPARK_VERSION")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Kubernetes image pull policy. Valid values are Always, Never, 
and IfNotPresent.")
+  .stringConf
+  .checkValues(Set("Always", "Never", "IfNotPresent"))
+  .createWithDefault("IfNotPresent")
+
+  val APISERVER_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses" +
+" this service account when requesting executor pods from the API 
server. If specific" +
+" credentials are given for the driver pod to use, the driver will 
favor" +
+" using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is dynamically determined
+  // based on the executor memory.
+  val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.executor.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
per executor. This" +
+" is memory that accounts for things like VM overheads, interned 
strings, other native" +
+" overheads, etc. This tends to grow with the executor size. 
(typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
--- End diff --

What about driver memory overhead ?
I see that mesos does not support it, while yarn does - is it relevant here 
?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153090634
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 ---
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.Utils
+
+/**
+ * A factory class for configuring and creating executor pods.
+ */
+private[spark] trait ExecutorPodFactory {
+
+  /**
+   * Configure and construct an executor pod with the given parameters.
+   */
+  def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod
+}
+
+private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
+  extends ExecutorPodFactory {
+
+  import ExecutorPodFactoryImpl._
+
+  private val executorExtraClasspath =
+sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+
+  private val executorLabels = 
ConfigurationUtils.parsePrefixedKeyValuePairs(
+sparkConf,
+KUBERNETES_EXECUTOR_LABEL_PREFIX)
+  require(
+!executorLabels.contains(SPARK_APP_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
+  require(
+!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
+  " Spark.")
+  require(
+!executorLabels.contains(SPARK_ROLE_LABEL),
+s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is 
reserved for Spark.")
+
+  private val executorAnnotations =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
+  private val nodeSelector =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_NODE_SELECTOR_PREFIX)
+
+  private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
+  private val dockerImagePullPolicy = 
sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+  private val executorPort = sparkConf.getInt("spark.executor.port", 
DEFAULT_STATIC_PORT)
+  private val blockManagerPort = sparkConf
+.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+
+  private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+
+  private val executorMemoryMiB = 
sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
+  private val executorMemoryString = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
+org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = sparkConf
+.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
+
+  private val executorCores = sparkConf.getDouble("spark.executor.cores", 
1)
+  private val executorLimitCores = 
sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
+
+  override def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod = {
+val name = 

[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2017-11-26 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/11994
  
@felixcheung thanks for your reviewing. I think there's no next step, 
current changes should be enough for user to externalize customized metrics 
source and sink.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19816: [SPARK-21693][R][FOLLOWUP] Reduce shuffle partiti...

2017-11-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19816


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19816: [SPARK-21693][R][FOLLOWUP] Reduce shuffle partitions run...

2017-11-26 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19816
  
Thanks @felixcheung.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19816: [SPARK-21693][R][FOLLOWUP] Reduce shuffle partitions run...

2017-11-26 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19816
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-26 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r153089584
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
 // The block is actually going to be a range of a single map output 
file for this map, so
 // find out the consolidated file, then the offset within that from 
our index
+logDebug(s"Fetch block data for $blockId")
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
 val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
 try {
   ByteStreams.skipFully(in, blockId.reduceId * 8)
   val offset = in.readLong()
+  ByteStreams.skipFully(in, (blockId.length - 1) * 8)
--- End diff --

I get your point, thanks for the explanation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...

2017-11-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19821#discussion_r153087888
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -785,13 +785,36 @@ class CodegenContext {
* @param expressions the codes to evaluate expressions.
*/
   def splitExpressions(row: String, expressions: Seq[String]): String = {
-if (row == null || currentVars != null) {
+if (INPUT_ROW == null || currentVars != null) {
   // Cannot split these expressions because they are not created from 
a row object.
   return expressions.mkString("\n")
 }
 splitExpressions(expressions, funcName = "apply", arguments = 
("InternalRow", row) :: Nil)
   }
 
+  /**
+   * Splits the generated code of expressions into multiple functions, 
because function has
+   * 64kb code size limit in JVM. This version takes care of INPUT_ROW and 
currentVars
+   *
+   * @param expressions the codes to evaluate expressions.
+   * @param funcName the split function name base.
+   * @param argumentsExceptRow the list of (type, name) of the arguments 
of the split function
+   *   except for ctx.INPUT_ROW
+  */
+  def splitExpressions(
+  expressions: Seq[String],
+  funcName: String,
+  argumentsExceptRow: Seq[(String, String)]): String = {
+if (INPUT_ROW == null || currentVars != null) {
+  // Cannot split these expressions because they are not created from 
a row object.
+  return expressions.mkString("\n")
--- End diff --

Could we avoid using `return`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...

2017-11-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19821#discussion_r153087865
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -785,13 +785,36 @@ class CodegenContext {
* @param expressions the codes to evaluate expressions.
*/
   def splitExpressions(row: String, expressions: Seq[String]): String = {
-if (row == null || currentVars != null) {
+if (INPUT_ROW == null || currentVars != null) {
   // Cannot split these expressions because they are not created from 
a row object.
   return expressions.mkString("\n")
 }
 splitExpressions(expressions, funcName = "apply", arguments = 
("InternalRow", row) :: Nil)
   }
 
+  /**
+   * Splits the generated code of expressions into multiple functions, 
because function has
+   * 64kb code size limit in JVM. This version takes care of INPUT_ROW and 
currentVars
+   *
+   * @param expressions the codes to evaluate expressions.
+   * @param funcName the split function name base.
+   * @param argumentsExceptRow the list of (type, name) of the arguments 
of the split function
+   *   except for ctx.INPUT_ROW
+  */
+  def splitExpressions(
+  expressions: Seq[String],
+  funcName: String,
+  argumentsExceptRow: Seq[(String, String)]): String = {
--- End diff --

Let the caller also provides `ctx.INPUT_ROW`?
Change it to
```Scala
arguments: Seq[(String, String)]
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...

2017-11-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19752
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84199/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...

2017-11-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19752
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...

2017-11-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19821#discussion_r153087808
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -785,13 +785,36 @@ class CodegenContext {
* @param expressions the codes to evaluate expressions.
*/
   def splitExpressions(row: String, expressions: Seq[String]): String = {
--- End diff --

To make it consistent, how about changing it to
```Scala
 def splitExpressions(row: String, arguments: Seq[(String, String)]): 
String = {
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...

2017-11-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19752
  
**[Test build #84199 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84199/testReport)**
 for PR 19752 at commit 
[`9063583`](https://github.com/apache/spark/commit/9063583bce77348b9da61abec6e9fb5ae7aef117).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...

2017-11-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19752
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84198/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...

2017-11-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19752
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...

2017-11-26 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19752
  
**[Test build #84198 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84198/testReport)**
 for PR 19752 at commit 
[`f9c20be`](https://github.com/apache/spark/commit/f9c20bea19e1e03394a976c90012fc8744267065).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >