[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user gczsjdy commented on the issue: https://github.com/apache/spark/pull/19788 Can we just add the `ContinuousShuffleBlockId` without adding new conf `spark.shuffle.continuousFetch`? While in classes related to shuffle read like `ShuffleBlockFetcherIterator`, we also pattern match the formal `ShuffleBlockId`. This way no addition confs are needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19752#discussion_r153124078 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -158,111 +178,73 @@ abstract class CaseWhenBase( val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("") "CASE" + cases + elseCase + " END" } -} - - -/** - * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END". - * When a = true, returns b; when c = true, returns d; else returns e. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -// scalastyle:off line.size.limit -@ExpressionDescription( - usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, returns `expr4`; else returns `expr5`.", - arguments = """ -Arguments: - * expr1, expr3 - the branch condition expressions should all be boolean type. - * expr2, expr4, expr5 - the branch value expressions and else value expression should all be - same type or coercible to a common type. - """, - examples = """ -Examples: - > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 1 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 2 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END; - NULL - """) -// scalastyle:on line.size.limit -case class CaseWhen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with CodegenFallback with Serializable { - - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -super[CodegenFallback].doGenCode(ctx, ev) - } - - def toCodegen(): CaseWhenCodegen = { -CaseWhenCodegen(branches, elseValue) - } -} - -/** - * CaseWhen expression used when code generation condition is satisfied. - * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -case class CaseWhenCodegen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with Serializable { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -// Generate code that looks like: -// -// condA = ... -// if (condA) { -// valueA -// } else { -// condB = ... -// if (condB) { -// valueB -// } else { -// condC = ... -// if (condC) { -// valueC -// } else { -// elseValue -// } -// } -// } +// This variable represents whether the first successful condition is met or not. +// It is initialized to `false` and it is set to `true` when the first condition which +// evaluates to `true` is met and therefore is not needed to go on anymore on the computation +// of the following conditions. +val conditionMet = ctx.freshName("caseWhenConditionMet") +ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull) +ctx.addMutableState(ctx.javaType(dataType), ev.value) val cases = branches.map { case (condExpr, valueExpr) => val cond = condExpr.genCode(ctx) val res = valueExpr.genCode(ctx) s""" -${cond.code} -if (!${cond.isNull} && ${cond.value}) { - ${res.code} - ${ev.isNull} = ${res.isNull}; - ${ev.value} = ${res.value}; +if(!$conditionMet) { + ${cond.code} + if (!${cond.isNull} && ${cond.value}) { +${res.code} +${ev.isNull} = ${res.isNull}; +${ev.value} = ${res.value}; +$conditionMet = true; + } } """ } -var generatedCode = cases.mkString("", "\nelse {\n", "\nelse {\n") - -elseValue.foreach { elseExpr => +val elseCode = elseValue.map { elseExpr => val res = elseExpr.genCode(ctx) - generatedCode += -s""" + s""" +if(!$conditionMet) { ${res.code} ${ev.isNull} = ${res.isNull}; ${ev.value} = ${res.value}; -""" +} + """ } -generatedCode += "}\n" * cases.size +val
[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19821#discussion_r153123637 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -785,13 +785,36 @@ class CodegenContext { * @param expressions the codes to evaluate expressions. */ def splitExpressions(row: String, expressions: Seq[String]): String = { -if (row == null || currentVars != null) { +if (INPUT_ROW == null || currentVars != null) { // Cannot split these expressions because they are not created from a row object. return expressions.mkString("\n") } splitExpressions(expressions, funcName = "apply", arguments = ("InternalRow", row) :: Nil) } + /** + * Splits the generated code of expressions into multiple functions, because function has + * 64kb code size limit in JVM. This version takes care of INPUT_ROW and currentVars + * + * @param expressions the codes to evaluate expressions. + * @param funcName the split function name base. + * @param argumentsExceptRow the list of (type, name) of the arguments of the split function + * except for ctx.INPUT_ROW + */ + def splitExpressions( + expressions: Seq[String], + funcName: String, + argumentsExceptRow: Seq[(String, String)]): String = { --- End diff -- Could you check the caller of case 3 which also need to check `INPUT_ROW` and `currentVars`? It sounds like some of them miss the checking. In addition, case `1` and `2` can be easily combined. I think we need a different name for case `1` and `2`. How about `splitExpressionsOnInputRow`? cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18641: [SPARK-21413][SQL] Fix 64KB JVM bytecode limit pr...
Github user kiszk closed the pull request at: https://github.com/apache/spark/pull/18641 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18641: [SPARK-21413][SQL] Fix 64KB JVM bytecode limit problem i...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/18641 #19752 will cover this solution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19752#discussion_r153123184 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -158,111 +178,73 @@ abstract class CaseWhenBase( val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("") "CASE" + cases + elseCase + " END" } -} - - -/** - * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END". - * When a = true, returns b; when c = true, returns d; else returns e. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -// scalastyle:off line.size.limit -@ExpressionDescription( - usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, returns `expr4`; else returns `expr5`.", - arguments = """ -Arguments: - * expr1, expr3 - the branch condition expressions should all be boolean type. - * expr2, expr4, expr5 - the branch value expressions and else value expression should all be - same type or coercible to a common type. - """, - examples = """ -Examples: - > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 1 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 2 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END; - NULL - """) -// scalastyle:on line.size.limit -case class CaseWhen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with CodegenFallback with Serializable { - - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -super[CodegenFallback].doGenCode(ctx, ev) - } - - def toCodegen(): CaseWhenCodegen = { -CaseWhenCodegen(branches, elseValue) - } -} - -/** - * CaseWhen expression used when code generation condition is satisfied. - * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -case class CaseWhenCodegen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with Serializable { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -// Generate code that looks like: -// -// condA = ... -// if (condA) { -// valueA -// } else { -// condB = ... -// if (condB) { -// valueB -// } else { -// condC = ... -// if (condC) { -// valueC -// } else { -// elseValue -// } -// } -// } +// This variable represents whether the first successful condition is met or not. +// It is initialized to `false` and it is set to `true` when the first condition which +// evaluates to `true` is met and therefore is not needed to go on anymore on the computation +// of the following conditions. +val conditionMet = ctx.freshName("caseWhenConditionMet") +ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull) +ctx.addMutableState(ctx.javaType(dataType), ev.value) val cases = branches.map { case (condExpr, valueExpr) => val cond = condExpr.genCode(ctx) val res = valueExpr.genCode(ctx) s""" -${cond.code} -if (!${cond.isNull} && ${cond.value}) { - ${res.code} - ${ev.isNull} = ${res.isNull}; - ${ev.value} = ${res.value}; +if(!$conditionMet) { + ${cond.code} + if (!${cond.isNull} && ${cond.value}) { +${res.code} +${ev.isNull} = ${res.isNull}; +${ev.value} = ${res.value}; +$conditionMet = true; + } } """ } -var generatedCode = cases.mkString("", "\nelse {\n", "\nelse {\n") - -elseValue.foreach { elseExpr => +val elseCode = elseValue.map { elseExpr => val res = elseExpr.genCode(ctx) - generatedCode += -s""" + s""" +if(!$conditionMet) { ${res.code} ${ev.isNull} = ${res.isNull}; ${ev.value} = ${res.value}; -""" +} + """ } -generatedCode += "}\n" * cases.size +val allConditions
[GitHub] spark issue #19813: [WIP][SPARK-22600][SQL] Fix 64kb limit for deeply nested...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19813 **[Test build #84207 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84207/testReport)** for PR 19813 at commit [`9f848be`](https://github.com/apache/spark/commit/9f848be45dcc294d6f27f2c6eaeed1907d36f004). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19821#discussion_r153120458 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -785,13 +785,36 @@ class CodegenContext { * @param expressions the codes to evaluate expressions. */ def splitExpressions(row: String, expressions: Seq[String]): String = { --- End diff -- I see. In addition to that, how about this since many caller passes `INPUT_ROW`? ``` def splitExpressions(row: String, arguments: Seq[(String, String)] = ("InternalRow", INPUT_ROW)): String = { ``` ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19821#discussion_r153120160 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -785,13 +785,36 @@ class CodegenContext { * @param expressions the codes to evaluate expressions. */ def splitExpressions(row: String, expressions: Seq[String]): String = { -if (row == null || currentVars != null) { +if (INPUT_ROW == null || currentVars != null) { // Cannot split these expressions because they are not created from a row object. return expressions.mkString("\n") } splitExpressions(expressions, funcName = "apply", arguments = ("InternalRow", row) :: Nil) } + /** + * Splits the generated code of expressions into multiple functions, because function has + * 64kb code size limit in JVM. This version takes care of INPUT_ROW and currentVars + * + * @param expressions the codes to evaluate expressions. + * @param funcName the split function name base. + * @param argumentsExceptRow the list of (type, name) of the arguments of the split function + * except for ctx.INPUT_ROW + */ + def splitExpressions( + expressions: Seq[String], + funcName: String, + argumentsExceptRow: Seq[(String, String)]): String = { --- End diff -- Now, we have three `splitExpressions` in this PR. 1. `splitExpression(row, expressions)` 2. `splitExpression(expressions, funcName, arguments)` 3. `splitExpression(expressions, funcName, arguments, returnType, ...)` It is hard to combine 2. and 3. since 2. takes care of `INPUT_ROW` and `currentVars` while 3. does not take care of them. Are you suggesting me to combine 1. and 2. which take care of `INPUT_ROW` and `currentVars`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...
Github user gczsjdy commented on the issue: https://github.com/apache/spark/pull/19764 @caneGuy Can you give a specific example to illustrate your change? Maybe former partition result & later partition result? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19788 @yucai I'm thinking of the necessity to add this new configuration `spark.shuffle.continuousFetch` like you mentioned above. This PR you proposed is actually a superset of previous way, it is compatible with original shuffle way if length = 1. The configuration here is only used to keep compatible for external shuffle service, I think it is not so intuitive and user may confused whether this should be enabled or not (since since conf is functionality-orientiated). Besides do we need to guarantee forward compatible, also is there a transparent way to automatically switch between two shuffles without configuration? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19752 **[Test build #84206 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84206/testReport)** for PR 19752 at commit [`f4c7896`](https://github.com/apache/spark/commit/f4c78965a8cee34e3be8b9d8e264f2d6eb0d27f5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19752#discussion_r153118605 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -211,111 +231,73 @@ abstract class CaseWhenBase( val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("") "CASE" + cases + elseCase + " END" } -} - - -/** - * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END". - * When a = true, returns b; when c = true, returns d; else returns e. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -// scalastyle:off line.size.limit -@ExpressionDescription( - usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, returns `expr4`; else returns `expr5`.", - arguments = """ -Arguments: - * expr1, expr3 - the branch condition expressions should all be boolean type. - * expr2, expr4, expr5 - the branch value expressions and else value expression should all be - same type or coercible to a common type. - """, - examples = """ -Examples: - > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 1 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 2 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END; - NULL - """) -// scalastyle:on line.size.limit -case class CaseWhen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with CodegenFallback with Serializable { - - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -super[CodegenFallback].doGenCode(ctx, ev) - } - - def toCodegen(): CaseWhenCodegen = { -CaseWhenCodegen(branches, elseValue) - } -} - -/** - * CaseWhen expression used when code generation condition is satisfied. - * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -case class CaseWhenCodegen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with Serializable { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -// Generate code that looks like: -// -// condA = ... -// if (condA) { -// valueA -// } else { -// condB = ... -// if (condB) { -// valueB -// } else { -// condC = ... -// if (condC) { -// valueC -// } else { -// elseValue -// } -// } -// } +// This variable represents whether the first successful condition is met or not. +// It is initialized to `false` and it is set to `true` when the first condition which +// evaluates to `true` is met and therefore is not needed to go on anymore on the computation +// of the following conditions. +val conditionMet = ctx.freshName("caseWhenConditionMet") +ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull, "") --- End diff -- thanks, I branched from a version when there was no default value. I merged and fixed it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19752#discussion_r153118387 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -211,111 +231,73 @@ abstract class CaseWhenBase( val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("") "CASE" + cases + elseCase + " END" } -} - - -/** - * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END". - * When a = true, returns b; when c = true, returns d; else returns e. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -// scalastyle:off line.size.limit -@ExpressionDescription( - usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, returns `expr4`; else returns `expr5`.", - arguments = """ -Arguments: - * expr1, expr3 - the branch condition expressions should all be boolean type. - * expr2, expr4, expr5 - the branch value expressions and else value expression should all be - same type or coercible to a common type. - """, - examples = """ -Examples: - > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 1 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 2 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END; - NULL - """) -// scalastyle:on line.size.limit -case class CaseWhen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with CodegenFallback with Serializable { - - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -super[CodegenFallback].doGenCode(ctx, ev) - } - - def toCodegen(): CaseWhenCodegen = { -CaseWhenCodegen(branches, elseValue) - } -} - -/** - * CaseWhen expression used when code generation condition is satisfied. - * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -case class CaseWhenCodegen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with Serializable { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -// Generate code that looks like: -// -// condA = ... -// if (condA) { -// valueA -// } else { -// condB = ... -// if (condB) { -// valueB -// } else { -// condC = ... -// if (condC) { -// valueC -// } else { -// elseValue -// } -// } -// } +// This variable represents whether the first successful condition is met or not. +// It is initialized to `false` and it is set to `true` when the first condition which +// evaluates to `true` is met and therefore is not needed to go on anymore on the computation +// of the following conditions. +val conditionMet = ctx.freshName("caseWhenConditionMet") +ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull, "") +ctx.addMutableState(ctx.javaType(dataType), ev.value, "") val cases = branches.map { case (condExpr, valueExpr) => val cond = condExpr.genCode(ctx) val res = valueExpr.genCode(ctx) s""" -${cond.code} -if (!${cond.isNull} && ${cond.value}) { - ${res.code} - ${ev.isNull} = ${res.isNull}; - ${ev.value} = ${res.value}; +if(!$conditionMet) { + ${cond.code} + if (!${cond.isNull} && ${cond.value}) { +${res.code} +${ev.isNull} = ${res.isNull}; +${ev.value} = ${res.value}; +$conditionMet = true; + } } """ } -var generatedCode = cases.mkString("", "\nelse {\n", "\nelse {\n") - -elseValue.foreach { elseExpr => +val elseCode = elseValue.map { elseExpr => val res = elseExpr.genCode(ctx) - generatedCode += -s""" + s""" +if(!$conditionMet) { ${res.code} ${ev.isNull} = ${res.isNull}; ${ev.value} = ${res.value}; -""" +} + """ } -generatedCode += "}\n" * cases.size +val
[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19752#discussion_r153118326 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -211,111 +231,73 @@ abstract class CaseWhenBase( val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("") "CASE" + cases + elseCase + " END" } -} - - -/** - * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END". - * When a = true, returns b; when c = true, returns d; else returns e. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -// scalastyle:off line.size.limit -@ExpressionDescription( - usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, returns `expr4`; else returns `expr5`.", - arguments = """ -Arguments: - * expr1, expr3 - the branch condition expressions should all be boolean type. - * expr2, expr4, expr5 - the branch value expressions and else value expression should all be - same type or coercible to a common type. - """, - examples = """ -Examples: - > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 1 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 2 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END; - NULL - """) -// scalastyle:on line.size.limit -case class CaseWhen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with CodegenFallback with Serializable { - - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -super[CodegenFallback].doGenCode(ctx, ev) - } - - def toCodegen(): CaseWhenCodegen = { -CaseWhenCodegen(branches, elseValue) - } -} - -/** - * CaseWhen expression used when code generation condition is satisfied. - * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -case class CaseWhenCodegen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with Serializable { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -// Generate code that looks like: --- End diff -- I don't think it is necessary since now the generated code is way easier and more standard and nowhere else a comment like this is provided. Anyway, if you feel it is needed, I can add it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user gczsjdy commented on the issue: https://github.com/apache/spark/pull/19788 What are ` external shuffle service` here? Can you explain a little bit? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r153117548 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala --- @@ -116,8 +117,8 @@ object BlockId { def apply(name: String): BlockId = name match { case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt) -case SHUFFLE(shuffleId, mapId, reduceId) => - ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) +case SHUFFLE(shuffleId, mapId, reduceId, n) => --- End diff -- Yes, good catch! I will change here after using `ContinuousShuffleBlockId` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r153117088 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala --- @@ -116,8 +117,8 @@ object BlockId { def apply(name: String): BlockId = name match { case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt) -case SHUFFLE(shuffleId, mapId, reduceId) => - ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) +case SHUFFLE(shuffleId, mapId, reduceId, n) => --- End diff -- :nit length? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11215: [SPARK-10969] [Streaming] [Kinesis] Allow specify...
Github user kaklakariada closed the pull request at: https://github.com/apache/spark/pull/11215 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11215: [SPARK-10969] [Streaming] [Kinesis] Allow specifying sep...
Github user kaklakariada commented on the issue: https://github.com/apache/spark/pull/11215 Solved with https://issues.apache.org/jira/browse/SPARK-19911 / #17250, see [this comment](https://issues.apache.org/jira/browse/SPARK-10969?focusedCommentId=16266374=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16266374). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19819: [SPARK-22606][Streaming]Add threadId to the CachedKafkaC...
Github user lvdongr commented on the issue: https://github.com/apache/spark/pull/19819 Will the cached consumer to the same partition increase , when different tasks consume the same partition and no place to remove? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19792: [SPARK-22566][PYTHON] Better error message for `_...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19792#discussion_r15398 --- Diff: python/pyspark/sql/types.py --- @@ -1108,19 +1109,23 @@ def _has_nulltype(dt): return isinstance(dt, NullType) -def _merge_type(a, b): +def _merge_type(a, b, path=''): --- End diff -- Should we have the default path name for the case we don't have names? Otherwise, the error message would be like `TypeError: .arrayElement: Can not merge type and `. WDYT? @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19792: [SPARK-22566][PYTHON] Better error message for `_...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19792#discussion_r153110594 --- Diff: python/pyspark/sql/tests.py --- @@ -1722,6 +1723,83 @@ def test_infer_long_type(self): self.assertEqual(_infer_type(2**61), LongType()) self.assertEqual(_infer_type(2**71), LongType()) +def test_merge_type(self): +self.assertEqual(_merge_type(LongType(), NullType()), LongType()) +self.assertEqual(_merge_type(NullType(), LongType()), LongType()) + +self.assertEqual(_merge_type(LongType(), LongType()), LongType()) + +self.assertEqual(_merge_type( +ArrayType(LongType()), +ArrayType(LongType()) +), ArrayType(LongType())) +with self.assertRaisesRegexp(TypeError, 'arrayElement'): +_merge_type(ArrayType(LongType()), ArrayType(DoubleType())) + +self.assertEqual(_merge_type( +MapType(StringType(), LongType()), +MapType(StringType(), LongType()) +), MapType(StringType(), LongType())) +with self.assertRaisesRegexp(TypeError, 'mapKey'): +_merge_type( +MapType(StringType(), LongType()), +MapType(DoubleType(), LongType())) +with self.assertRaisesRegexp(TypeError, 'mapValue'): +_merge_type( +MapType(StringType(), LongType()), +MapType(StringType(), DoubleType())) + +self.assertEqual(_merge_type( +StructType([StructField("f1", LongType()), StructField("f2", StringType())]), +StructType([StructField("f1", LongType()), StructField("f2", StringType())]) +), StructType([StructField("f1", LongType()), StructField("f2", StringType())])) +with self.assertRaisesRegexp(TypeError, r'structField\("f1"\)'): --- End diff -- nit: We don't need `r` prefix for each regex for `assertRaisesRegexp`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19815: [SPARK-22602][SQL] remove ColumnVector#loadBytes
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19815 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19815: [SPARK-22602][SQL] remove ColumnVector#loadBytes
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19815 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/11994#discussion_r153110760 --- Diff: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala --- @@ -195,18 +196,26 @@ private[spark] class MetricsSystem private ( val classPath = kv._2.getProperty("class") if (null != classPath) { try { - val sink = Utils.classForName(classPath) -.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) -.newInstance(kv._2, registry, securityMgr) + val sink = Utils.classForName(classPath).getConstructor( +classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) + .newInstance(kv._2, registry, securityMgr) if (kv._1 == "servlet") { metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) } else { sinks += sink.asInstanceOf[Sink] } } catch { - case e: Exception => -logError("Sink class " + classPath + " cannot be instantiated") -throw e + case _: NoSuchMethodException => +try { + sinks += Utils.classForName(classPath) --- End diff -- No, not necessary, `MetricsServlet` is a built-in metrics sink which will be explicitly added in the above code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19815: [SPARK-22602][SQL] remove ColumnVector#loadBytes
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19815 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19815: [SPARK-22602][SQL] remove ColumnVector#loadBytes
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19815 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84203/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19815: [SPARK-22602][SQL] remove ColumnVector#loadBytes
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19815 **[Test build #84203 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84203/testReport)** for PR 19815 at commit [`5711bb2`](https://github.com/apache/spark/commit/5711bb20e9d598c8716d7d8636466ee82e623a20). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/11994 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84202/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/11994 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/11994 **[Test build #84202 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84202/testReport)** for PR 11994 at commit [`f360dac`](https://github.com/apache/spark/commit/f360dac77499d7d0eebc09528255922c9315fb31). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user sriramrajendiran commented on the issue: https://github.com/apache/spark/pull/16578 @felixcheung can you help ? we are hoping to see it in 2.3 release. Feature underneath a default disabled flag looks safe option. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19607 **[Test build #84205 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84205/testReport)** for PR 19607 at commit [`40a9735`](https://github.com/apache/spark/commit/40a9735b88deb85c08f618186daf9ed2152fc406). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19607 **[Test build #84204 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84204/testReport)** for PR 19607 at commit [`f92eae3`](https://github.com/apache/spark/commit/f92eae35767a766ad80ac576a67f521e365549c7). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153107748 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -997,6 +997,14 @@ object SQLConf { .intConf .createWithDefault(1) + val PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE = +buildConf("spark.sql.execution.pandas.respectSessionTimeZone") + .internal() + .doc("When true, make Pandas DataFrame with timestamp type respecting session local " + +"timezone when converting to/from Pandas DataFrame.") --- End diff -- Sure, I'll update it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19607#discussion_r153107765 --- Diff: python/setup.py --- @@ -201,7 +201,7 @@ def _supports_symlinks(): extras_require={ 'ml': ['numpy>=1.7'], 'mllib': ['numpy>=1.7'], -'sql': ['pandas>=0.13.0'] +'sql': ['pandas>=0.19.2'] --- End diff -- Sure, I'll add it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19815: [SPARK-22602][SQL] remove ColumnVector#loadBytes
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19815 LGTM pending Jenkins. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/11994 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/11994 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84201/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/11994 **[Test build #84201 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84201/testReport)** for PR 11994 at commit [`38bc2a4`](https://github.com/apache/spark/commit/38bc2a43a69ffb0b40dc5ebb0a60568225eeb798). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19752#discussion_r153101065 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -211,111 +231,73 @@ abstract class CaseWhenBase( val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("") "CASE" + cases + elseCase + " END" } -} - - -/** - * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END". - * When a = true, returns b; when c = true, returns d; else returns e. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -// scalastyle:off line.size.limit -@ExpressionDescription( - usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, returns `expr4`; else returns `expr5`.", - arguments = """ -Arguments: - * expr1, expr3 - the branch condition expressions should all be boolean type. - * expr2, expr4, expr5 - the branch value expressions and else value expression should all be - same type or coercible to a common type. - """, - examples = """ -Examples: - > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 1 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 2 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END; - NULL - """) -// scalastyle:on line.size.limit -case class CaseWhen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with CodegenFallback with Serializable { - - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -super[CodegenFallback].doGenCode(ctx, ev) - } - - def toCodegen(): CaseWhenCodegen = { -CaseWhenCodegen(branches, elseValue) - } -} - -/** - * CaseWhen expression used when code generation condition is satisfied. - * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -case class CaseWhenCodegen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with Serializable { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -// Generate code that looks like: --- End diff -- shall we keep this comment and update it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19752 LGTM except a few minor comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19752#discussion_r153100483 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -211,111 +231,73 @@ abstract class CaseWhenBase( val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("") "CASE" + cases + elseCase + " END" } -} - - -/** - * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END". - * When a = true, returns b; when c = true, returns d; else returns e. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -// scalastyle:off line.size.limit -@ExpressionDescription( - usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, returns `expr4`; else returns `expr5`.", - arguments = """ -Arguments: - * expr1, expr3 - the branch condition expressions should all be boolean type. - * expr2, expr4, expr5 - the branch value expressions and else value expression should all be - same type or coercible to a common type. - """, - examples = """ -Examples: - > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 1 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 2 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END; - NULL - """) -// scalastyle:on line.size.limit -case class CaseWhen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with CodegenFallback with Serializable { - - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -super[CodegenFallback].doGenCode(ctx, ev) - } - - def toCodegen(): CaseWhenCodegen = { -CaseWhenCodegen(branches, elseValue) - } -} - -/** - * CaseWhen expression used when code generation condition is satisfied. - * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -case class CaseWhenCodegen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with Serializable { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -// Generate code that looks like: -// -// condA = ... -// if (condA) { -// valueA -// } else { -// condB = ... -// if (condB) { -// valueB -// } else { -// condC = ... -// if (condC) { -// valueC -// } else { -// elseValue -// } -// } -// } +// This variable represents whether the first successful condition is met or not. +// It is initialized to `false` and it is set to `true` when the first condition which +// evaluates to `true` is met and therefore is not needed to go on anymore on the computation +// of the following conditions. +val conditionMet = ctx.freshName("caseWhenConditionMet") +ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull, "") +ctx.addMutableState(ctx.javaType(dataType), ev.value, "") val cases = branches.map { case (condExpr, valueExpr) => val cond = condExpr.genCode(ctx) val res = valueExpr.genCode(ctx) s""" -${cond.code} -if (!${cond.isNull} && ${cond.value}) { - ${res.code} - ${ev.isNull} = ${res.isNull}; - ${ev.value} = ${res.value}; +if(!$conditionMet) { + ${cond.code} + if (!${cond.isNull} && ${cond.value}) { +${res.code} +${ev.isNull} = ${res.isNull}; +${ev.value} = ${res.value}; +$conditionMet = true; + } } """ } -var generatedCode = cases.mkString("", "\nelse {\n", "\nelse {\n") - -elseValue.foreach { elseExpr => +val elseCode = elseValue.map { elseExpr => val res = elseExpr.genCode(ctx) - generatedCode += -s""" + s""" +if(!$conditionMet) { ${res.code} ${ev.isNull} = ${res.isNull}; ${ev.value} = ${res.value}; -""" +} + """ } -generatedCode += "}\n" * cases.size +val
[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19752#discussion_r153100103 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -211,111 +231,73 @@ abstract class CaseWhenBase( val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("") "CASE" + cases + elseCase + " END" } -} - - -/** - * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END". - * When a = true, returns b; when c = true, returns d; else returns e. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -// scalastyle:off line.size.limit -@ExpressionDescription( - usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, returns `expr4`; else returns `expr5`.", - arguments = """ -Arguments: - * expr1, expr3 - the branch condition expressions should all be boolean type. - * expr2, expr4, expr5 - the branch value expressions and else value expression should all be - same type or coercible to a common type. - """, - examples = """ -Examples: - > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 1 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 2 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END; - NULL - """) -// scalastyle:on line.size.limit -case class CaseWhen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with CodegenFallback with Serializable { - - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -super[CodegenFallback].doGenCode(ctx, ev) - } - - def toCodegen(): CaseWhenCodegen = { -CaseWhenCodegen(branches, elseValue) - } -} - -/** - * CaseWhen expression used when code generation condition is satisfied. - * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -case class CaseWhenCodegen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with Serializable { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -// Generate code that looks like: -// -// condA = ... -// if (condA) { -// valueA -// } else { -// condB = ... -// if (condB) { -// valueB -// } else { -// condC = ... -// if (condC) { -// valueC -// } else { -// elseValue -// } -// } -// } +// This variable represents whether the first successful condition is met or not. +// It is initialized to `false` and it is set to `true` when the first condition which +// evaluates to `true` is met and therefore is not needed to go on anymore on the computation +// of the following conditions. +val conditionMet = ctx.freshName("caseWhenConditionMet") +ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull, "") --- End diff -- nit: `ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull)`, as empty string is the default value of the 3rd parameter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19752 LGTM cc @cloud-fan @kiszk --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19714: [SPARK-22489][SQL] Shouldn't change broadcast join build...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19714 Just for curiosity, what if users ask to broadcast both the join side in the hint? Shall we throw exception or pick a smaller side to broadcast according to stats? BTW it's a behavior change, although it's more reasonable than before, we need to document it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19821#discussion_r153099135 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -785,13 +785,36 @@ class CodegenContext { * @param expressions the codes to evaluate expressions. */ def splitExpressions(row: String, expressions: Seq[String]): String = { -if (row == null || currentVars != null) { +if (INPUT_ROW == null || currentVars != null) { // Cannot split these expressions because they are not created from a row object. return expressions.mkString("\n") } splitExpressions(expressions, funcName = "apply", arguments = ("InternalRow", row) :: Nil) } + /** + * Splits the generated code of expressions into multiple functions, because function has + * 64kb code size limit in JVM. This version takes care of INPUT_ROW and currentVars + * + * @param expressions the codes to evaluate expressions. + * @param funcName the split function name base. + * @param argumentsExceptRow the list of (type, name) of the arguments of the split function + * except for ctx.INPUT_ROW + */ + def splitExpressions( + expressions: Seq[String], + funcName: String, + argumentsExceptRow: Seq[(String, String)]): String = { --- End diff -- Could we combine them in the same function? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19821#discussion_r153098738 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -785,13 +785,36 @@ class CodegenContext { * @param expressions the codes to evaluate expressions. */ def splitExpressions(row: String, expressions: Seq[String]): String = { -if (row == null || currentVars != null) { +if (INPUT_ROW == null || currentVars != null) { // Cannot split these expressions because they are not created from a row object. return expressions.mkString("\n") } splitExpressions(expressions, funcName = "apply", arguments = ("InternalRow", row) :: Nil) } + /** + * Splits the generated code of expressions into multiple functions, because function has + * 64kb code size limit in JVM. This version takes care of INPUT_ROW and currentVars + * + * @param expressions the codes to evaluate expressions. + * @param funcName the split function name base. + * @param argumentsExceptRow the list of (type, name) of the arguments of the split function + * except for ctx.INPUT_ROW + */ + def splitExpressions( + expressions: Seq[String], + funcName: String, + argumentsExceptRow: Seq[(String, String)]): String = { +if (INPUT_ROW == null || currentVars != null) { + // Cannot split these expressions because they are not created from a row object. + return expressions.mkString("\n") --- End diff -- If-else looks better. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/11994#discussion_r153098194 --- Diff: core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala --- @@ -25,27 +25,29 @@ import com.codahale.metrics.{ConsoleReporter, MetricRegistry} import org.apache.spark.SecurityManager import org.apache.spark.metrics.MetricsSystem -private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry, -securityMgr: SecurityManager) extends Sink { +private[spark] class ConsoleSink( +property: Properties, +registry: MetricRegistry, +securityMgr: SecurityManager) extends Sink(property, registry) { val CONSOLE_DEFAULT_PERIOD = 10 val CONSOLE_DEFAULT_UNIT = "SECONDS" val CONSOLE_KEY_PERIOD = "period" val CONSOLE_KEY_UNIT = "unit" - val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match { + private val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match { --- End diff -- Emmm... These logics seems quite similar among different Sinks, could we abstract these logic in `Sink` class, or generate a trait for them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/11994#discussion_r153097744 --- Diff: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala --- @@ -195,18 +196,26 @@ private[spark] class MetricsSystem private ( val classPath = kv._2.getProperty("class") if (null != classPath) { try { - val sink = Utils.classForName(classPath) -.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) -.newInstance(kv._2, registry, securityMgr) + val sink = Utils.classForName(classPath).getConstructor( +classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) + .newInstance(kv._2, registry, securityMgr) if (kv._1 == "servlet") { metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) } else { sinks += sink.asInstanceOf[Sink] } } catch { - case e: Exception => -logError("Sink class " + classPath + " cannot be instantiated") -throw e + case _: NoSuchMethodException => +try { + sinks += Utils.classForName(classPath) --- End diff -- Do we have to handle the case when `kv._1 == "servlet"` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/11994#discussion_r153097850 --- Diff: core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala --- @@ -25,27 +25,29 @@ import com.codahale.metrics.{ConsoleReporter, MetricRegistry} import org.apache.spark.SecurityManager import org.apache.spark.metrics.MetricsSystem -private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry, -securityMgr: SecurityManager) extends Sink { +private[spark] class ConsoleSink( --- End diff -- nit: add comment for each param here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/11994#discussion_r153097471 --- Diff: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala --- @@ -195,18 +196,26 @@ private[spark] class MetricsSystem private ( val classPath = kv._2.getProperty("class") if (null != classPath) { try { - val sink = Utils.classForName(classPath) -.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) -.newInstance(kv._2, registry, securityMgr) + val sink = Utils.classForName(classPath).getConstructor( +classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) --- End diff -- Why make this format change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/11994#discussion_r153098545 --- Diff: core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala --- @@ -17,8 +17,37 @@ package org.apache.spark.metrics.sink -private[spark] trait Sink { +import java.util.Properties + +import com.codahale.metrics.MetricRegistry + +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * The abstract class of metrics Sink, by achiving the methods and registered through metrics + * .properties user could register customer metrics Sink into MetricsSystem. + * + * @param properties Properties related this specific Sink, properties are read from + * configuration file, user could define their own configurations and get + * from this parameter. + * @param metricRegistry The MetricRegistry for you to dump the collected metrics. + */ +@DeveloperApi +abstract class Sink(properties: Properties, metricRegistry: MetricRegistry) { + + /** + * Start this metrics Sink, this will be called by MetricsSystem --- End diff -- Do we have to define the behavior when start failed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19813: [WIP][SPARK-22600][SQL] Fix 64kb limit for deeply nested...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19813 If we have a clear rule, I think it makes more sense to do this in `CodegenContext`, i.e. having a `def splitExpressions(expressions: Seq[String]): String`, which automatically extract the current inputs and put them into the parameter list. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19714: [SPARK-22489][SQL] Shouldn't change broadcast join build...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19714 We also need to update the comment of `JoinSelection`. cc @liufengdb @cloud-fan @rxin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19815: [SPARK-22602][SQL] remove ColumnVector#loadBytes
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19815 **[Test build #84203 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84203/testReport)** for PR 19815 at commit [`5711bb2`](https://github.com/apache/spark/commit/5711bb20e9d598c8716d7d8636466ee82e623a20). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19714#discussion_r153098067 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala --- @@ -223,4 +223,36 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { assert(HashJoin.rewriteKeyExpr(l :: ss :: Nil) === l :: ss :: Nil) assert(HashJoin.rewriteKeyExpr(i :: ss :: Nil) === i :: ss :: Nil) } + + test("Shouldn't change broadcast join buildSide if user clearly specified") { +spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value").createTempView("table1") +spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value").createTempView("table2") + +def assertJoinBuildSide(pair: (String, BuildSide)): Any = { + val (sqlString, s) = pair + val df = sql(sqlString) + val physical = df.queryExecution.executedPlan + physical match { --- End diff -- Instead of doing `match`, can you just try to call `collect` and assert the result is 1? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19818: [SPARK-22604][SQL] remove the get address methods...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19818#discussion_r153096313 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java --- @@ -62,13 +62,6 @@ */ public abstract boolean anyNullsSet(); - /** - * Returns the off heap ptr for the arrays backing the NULLs and values buffer. Only valid - * to call for off heap columns. --- End diff -- good idea --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19818: [SPARK-22604][SQL] remove the get address methods...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19818#discussion_r153096296 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java --- @@ -73,12 +75,12 @@ public OffHeapColumnVector(int capacity, DataType type) { reset(); } - @Override + @VisibleForTesting public long valuesNativeAddress() { return data; } - @Override + @VisibleForTesting --- End diff -- they are called in benchmark --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19821#discussion_r153095567 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -785,13 +785,36 @@ class CodegenContext { * @param expressions the codes to evaluate expressions. */ def splitExpressions(row: String, expressions: Seq[String]): String = { -if (row == null || currentVars != null) { +if (INPUT_ROW == null || currentVars != null) { // Cannot split these expressions because they are not created from a row object. return expressions.mkString("\n") } splitExpressions(expressions, funcName = "apply", arguments = ("InternalRow", row) :: Nil) } + /** + * Splits the generated code of expressions into multiple functions, because function has + * 64kb code size limit in JVM. This version takes care of INPUT_ROW and currentVars + * + * @param expressions the codes to evaluate expressions. + * @param funcName the split function name base. + * @param argumentsExceptRow the list of (type, name) of the arguments of the split function + * except for ctx.INPUT_ROW + */ + def splitExpressions( + expressions: Seq[String], + funcName: String, + argumentsExceptRow: Seq[(String, String)]): String = { --- End diff -- I agree that it is good from the view of consistency. I have one question in my mind. If we use the same argument name `arguments`, is it possible to for developer to distinguish this `splitExpressions` from the below (rich) `splitExpressions` when they want to pass only three arguments `expressions`, `funcName`, and `arguments`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/11994 **[Test build #84202 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84202/testReport)** for PR 11994 at commit [`f360dac`](https://github.com/apache/spark/commit/f360dac77499d7d0eebc09528255922c9315fb31). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19814: [SPARK-22484][DOC] Document PySpark DataFrame csv writer...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19814 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84200/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19814: [SPARK-22484][DOC] Document PySpark DataFrame csv writer...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19814 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19814: [SPARK-22484][DOC] Document PySpark DataFrame csv writer...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19814 **[Test build #84200 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84200/testReport)** for PR 19814 at commit [`821ee19`](https://github.com/apache/spark/commit/821ee1993e26289bd5cc93be14543a8dbe913ff8). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19764: [SPARK-22539][SQL] Add second order for rangepartitioner...
Github user caneGuy commented on the issue: https://github.com/apache/spark/pull/19764 Ping any admin help review this?Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/11994 **[Test build #84201 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84201/testReport)** for PR 11994 at commit [`38bc2a4`](https://github.com/apache/spark/commit/38bc2a43a69ffb0b40dc5ebb0a60568225eeb798). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17520: [WIP][SPARK-19712][SQL] Move PullupCorrelatedPredicates ...
Github user wangyum commented on the issue: https://github.com/apache/spark/pull/17520 @nsyca Can you resolve conflicts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19717: [SPARK-18278] [Submission] Spark on Kubernetes - basic s...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/19717 +CC @srowen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19821#discussion_r153092301 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -785,13 +785,36 @@ class CodegenContext { * @param expressions the codes to evaluate expressions. */ def splitExpressions(row: String, expressions: Seq[String]): String = { -if (row == null || currentVars != null) { +if (INPUT_ROW == null || currentVars != null) { --- End diff -- good catch, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19821#discussion_r153092158 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -785,13 +785,36 @@ class CodegenContext { * @param expressions the codes to evaluate expressions. */ def splitExpressions(row: String, expressions: Seq[String]): String = { -if (row == null || currentVars != null) { +if (INPUT_ROW == null || currentVars != null) { // Cannot split these expressions because they are not created from a row object. return expressions.mkString("\n") } splitExpressions(expressions, funcName = "apply", arguments = ("InternalRow", row) :: Nil) } + /** + * Splits the generated code of expressions into multiple functions, because function has + * 64kb code size limit in JVM. This version takes care of INPUT_ROW and currentVars + * + * @param expressions the codes to evaluate expressions. + * @param funcName the split function name base. + * @param argumentsExceptRow the list of (type, name) of the arguments of the split function + * except for ctx.INPUT_ROW + */ + def splitExpressions( + expressions: Seq[String], + funcName: String, + argumentsExceptRow: Seq[(String, String)]): String = { +if (INPUT_ROW == null || currentVars != null) { + // Cannot split these expressions because they are not created from a row object. + return expressions.mkString("\n") --- End diff -- I followed to use `return` like the above `splitExpressions`. Is it better for this place to write as follows? `if () { expressions.mkString("\n") } else { splitExpressions(...) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19823: [SPARK-22601][SQL] Data load is getting displayed...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19823#discussion_r153091293 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -341,6 +341,12 @@ case class LoadDataCommand( } else { val uri = new URI(path) if (uri.getScheme() != null && uri.getAuthority() != null) { + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val srcPath = new Path(path) --- End diff -- nit: Let's use `new Path(uri)`. I think we better validate `uri` in this scope. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r153087493 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.Utils + +/** + * A factory class for configuring and creating executor pods. + */ +private[spark] trait ExecutorPodFactory { + + /** + * Configure and construct an executor pod with the given parameters. + */ + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = +sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX) + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + " Spark.") + require( +!executorLabels.contains(SPARK_ROLE_LABEL), +s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX) + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockManagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + + private val memoryOverheadMiB = sparkConf +.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + + private val executorCores = sparkConf.getDouble("spark.executor.cores", 1) + private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) + + override def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod = { +val name =
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r153088979 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.File + +import io.fabric8.kubernetes.client.Config + +import org.apache.spark.SparkContext +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} +import org.apache.spark.util.ThreadUtils + +private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging { + + override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") + + override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { +new TaskSchedulerImpl(sc) + } + + override def createSchedulerBackend( + sc: SparkContext, + masterURL: String, + scheduler: TaskScheduler): SchedulerBackend = { +val sparkConf = sc.getConf + +val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( + KUBERNETES_MASTER_INTERNAL_URL, + Some(sparkConf.get(KUBERNETES_NAMESPACE)), + APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX, + sparkConf, + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + +val executorPodFactory = new ExecutorPodFactoryImpl(sparkConf) +val allocatorExecutor = ThreadUtils + .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") +val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( + "kubernetes-executor-requests") --- End diff -- Why is this service passed from outside ? It is an impl detail of `KubernetesClusterSchedulerBackend` and should be initialized/managed within (create in constructor, shutdown in stop) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r153088232 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.Utils + +/** + * A factory class for configuring and creating executor pods. + */ +private[spark] trait ExecutorPodFactory { + + /** + * Configure and construct an executor pod with the given parameters. + */ + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = +sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX) + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + " Spark.") + require( +!executorLabels.contains(SPARK_ROLE_LABEL), +s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX) + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) --- End diff -- Is this required ? IIRC as of 2.0 the config was removed/is not longer used. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r153091143 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} +import javax.annotation.concurrent.GuardedBy + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK") + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( +requestExecutorsService) + + private val driverPod = kubernetesClient.pods() +.inNamespace(kubernetesNamespace) +.withName(kubernetesDriverPodName) +.get() + + protected override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + private val totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( +conf.get("spark.driver.host"), +conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), +CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) + + private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val allocatorRunnable = new Runnable { + +// Maintains a map of executor id to count of checks performed to learn the loss reason +// for an executor. +private val executorReasonCheckAttemptCounts
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r153084210 --- Diff: resource-managers/kubernetes/core/pom.xml --- @@ -0,0 +1,94 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + 4.0.0 + +org.apache.spark +spark-parent_2.11 +2.3.0-SNAPSHOT +../../../pom.xml + + + spark-kubernetes_2.11 + jar + Spark Project Kubernetes + +kubernetes +3.0.0 --- End diff -- Last time I asked about the client version, there were concerns regarding maturity/stability of 3.x compared to the 2.2 (iirc) version which was in use - where they resolved ? I, obviously, prefer moving to the latest major version; but want to understand the risks involved - if any. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r153087234 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.Utils + +/** + * A factory class for configuring and creating executor pods. + */ +private[spark] trait ExecutorPodFactory { + + /** + * Configure and construct an executor pod with the given parameters. + */ + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = +sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX) + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + " Spark.") + require( +!executorLabels.contains(SPARK_ROLE_LABEL), +s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX) + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockManagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + + private val memoryOverheadMiB = sparkConf +.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + + private val executorCores = sparkConf.getDouble("spark.executor.cores", 1) + private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) + + override def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod = { +val name =
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r153089366 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} +import javax.annotation.concurrent.GuardedBy + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK") + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( +requestExecutorsService) + + private val driverPod = kubernetesClient.pods() +.inNamespace(kubernetesNamespace) +.withName(kubernetesDriverPodName) +.get() + + protected override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + private val totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( +conf.get("spark.driver.host"), +conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), +CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) + + private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val allocatorRunnable = new Runnable { + +// Maintains a map of executor id to count of checks performed to learn the loss reason +// for an executor. +private val executorReasonCheckAttemptCounts
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r153089805 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} +import javax.annotation.concurrent.GuardedBy + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK") + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( +requestExecutorsService) + + private val driverPod = kubernetesClient.pods() +.inNamespace(kubernetesNamespace) +.withName(kubernetesDriverPodName) +.get() + + protected override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + private val totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( +conf.get("spark.driver.host"), +conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), +CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) + + private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val allocatorRunnable = new Runnable { + +// Maintains a map of executor id to count of checks performed to learn the loss reason +// for an executor. +private val executorReasonCheckAttemptCounts
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r153089664 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} +import javax.annotation.concurrent.GuardedBy + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK") + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( +requestExecutorsService) + + private val driverPod = kubernetesClient.pods() +.inNamespace(kubernetesNamespace) +.withName(kubernetesDriverPodName) +.get() + + protected override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + private val totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( +conf.get("spark.driver.host"), +conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), +CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) + + private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val allocatorRunnable = new Runnable { + +// Maintains a map of executor id to count of checks performed to learn the loss reason +// for an executor. +private val executorReasonCheckAttemptCounts
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r153087550 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.util.Utils + +/** + * Configures executor pods. Construct one of these with a SparkConf to set up properties that are + * common across all executors. Then, pass in dynamic parameters into createExecutorPod. + */ +private[spark] trait ExecutorPodFactory { + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + s" Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockmanagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + private val kubernetesDriverPodName = sparkConf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + + private val memoryOverheadMiB = sparkConf +.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + + private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d) + private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) + + override def createExecutorPod( + executorId: String,
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r153084631 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import org.apache.spark.SPARK_VERSION +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.network.util.ByteUnit + +private[spark] object Config extends Logging { + + val KUBERNETES_NAMESPACE = +ConfigBuilder("spark.kubernetes.namespace") + .doc("The namespace that will be used for running the driver and executor pods. When using" + +" spark-submit in cluster mode, this can also be passed to spark-submit via the" + +" --kubernetes-namespace command line argument.") + .stringConf + .createWithDefault("default") + + val EXECUTOR_DOCKER_IMAGE = +ConfigBuilder("spark.kubernetes.executor.docker.image") + .doc("Docker image to use for the executors. Specify this using the standard Docker tag" + +" format.") + .stringConf + .createWithDefault(s"spark-executor:$SPARK_VERSION") --- End diff -- Should this default `spark-executor:$SPARK_VERSION` be present ? I would expect the image name to be always explicitly specified by admin (in spark-defaults) or overridden by user. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r153089121 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} +import javax.annotation.concurrent.GuardedBy + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK") + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( +requestExecutorsService) + + private val driverPod = kubernetesClient.pods() +.inNamespace(kubernetesNamespace) +.withName(kubernetesDriverPodName) +.get() + + protected override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { --- End diff -- We need to update docs/configuration.md to specify this is used by both yarn and kubernetes now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r153084513 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import org.apache.spark.SPARK_VERSION +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.network.util.ByteUnit + +private[spark] object Config extends Logging { + + val KUBERNETES_NAMESPACE = +ConfigBuilder("spark.kubernetes.namespace") + .doc("The namespace that will be used for running the driver and executor pods. When using" + +" spark-submit in cluster mode, this can also be passed to spark-submit via the" + +" --kubernetes-namespace command line argument.") + .stringConf + .createWithDefault("default") + + val EXECUTOR_DOCKER_IMAGE = +ConfigBuilder("spark.kubernetes.executor.docker.image") + .doc("Docker image to use for the executors. Specify this using the standard Docker tag" + +" format.") + .stringConf + .createWithDefault(s"spark-executor:$SPARK_VERSION") + + val DOCKER_IMAGE_PULL_POLICY = +ConfigBuilder("spark.kubernetes.docker.image.pullPolicy") + .doc("Kubernetes image pull policy. Valid values are Always, Never, and IfNotPresent.") + .stringConf + .checkValues(Set("Always", "Never", "IfNotPresent")) + .createWithDefault("IfNotPresent") + + val APISERVER_AUTH_DRIVER_CONF_PREFIX = + "spark.kubernetes.authenticate.driver" + val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX = + "spark.kubernetes.authenticate.driver.mounted" + val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" + val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" + val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" + val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile" + val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" + + val KUBERNETES_SERVICE_ACCOUNT_NAME = +ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") + .doc("Service account that is used when running the driver pod. The driver pod uses" + +" this service account when requesting executor pods from the API server. If specific" + +" credentials are given for the driver pod to use, the driver will favor" + +" using those credentials instead.") + .stringConf + .createOptional + + // Note that while we set a default for this when we start up the + // scheduler, the specific default value is dynamically determined + // based on the executor memory. + val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD = +ConfigBuilder("spark.kubernetes.executor.memoryOverhead") + .doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This" + +" is memory that accounts for things like VM overheads, interned strings, other native" + +" overheads, etc. This tends to grow with the executor size. (typically 6-10%).") + .bytesConf(ByteUnit.MiB) + .createOptional --- End diff -- What about driver memory overhead ? I see that mesos does not support it, while yarn does - is it relevant here ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r153090634 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.Utils + +/** + * A factory class for configuring and creating executor pods. + */ +private[spark] trait ExecutorPodFactory { + + /** + * Configure and construct an executor pod with the given parameters. + */ + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = +sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX) + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + " Spark.") + require( +!executorLabels.contains(SPARK_ROLE_LABEL), +s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX) + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockManagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + + private val memoryOverheadMiB = sparkConf +.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + + private val executorCores = sparkConf.getDouble("spark.executor.cores", 1) + private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) + + override def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod = { +val name =
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/11994 @felixcheung thanks for your reviewing. I think there's no next step, current changes should be enough for user to externalize customized metrics source and sink. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19816: [SPARK-21693][R][FOLLOWUP] Reduce shuffle partiti...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19816 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19816: [SPARK-21693][R][FOLLOWUP] Reduce shuffle partitions run...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19816 Thanks @felixcheung. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19816: [SPARK-21693][R][FOLLOWUP] Reduce shuffle partitions run...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19816 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r153089584 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver( override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index +logDebug(s"Fetch block data for $blockId") val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) try { ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() + ByteStreams.skipFully(in, (blockId.length - 1) * 8) --- End diff -- I get your point, thanks for the explanation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19821#discussion_r153087888 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -785,13 +785,36 @@ class CodegenContext { * @param expressions the codes to evaluate expressions. */ def splitExpressions(row: String, expressions: Seq[String]): String = { -if (row == null || currentVars != null) { +if (INPUT_ROW == null || currentVars != null) { // Cannot split these expressions because they are not created from a row object. return expressions.mkString("\n") } splitExpressions(expressions, funcName = "apply", arguments = ("InternalRow", row) :: Nil) } + /** + * Splits the generated code of expressions into multiple functions, because function has + * 64kb code size limit in JVM. This version takes care of INPUT_ROW and currentVars + * + * @param expressions the codes to evaluate expressions. + * @param funcName the split function name base. + * @param argumentsExceptRow the list of (type, name) of the arguments of the split function + * except for ctx.INPUT_ROW + */ + def splitExpressions( + expressions: Seq[String], + funcName: String, + argumentsExceptRow: Seq[(String, String)]): String = { +if (INPUT_ROW == null || currentVars != null) { + // Cannot split these expressions because they are not created from a row object. + return expressions.mkString("\n") --- End diff -- Could we avoid using `return`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19821#discussion_r153087865 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -785,13 +785,36 @@ class CodegenContext { * @param expressions the codes to evaluate expressions. */ def splitExpressions(row: String, expressions: Seq[String]): String = { -if (row == null || currentVars != null) { +if (INPUT_ROW == null || currentVars != null) { // Cannot split these expressions because they are not created from a row object. return expressions.mkString("\n") } splitExpressions(expressions, funcName = "apply", arguments = ("InternalRow", row) :: Nil) } + /** + * Splits the generated code of expressions into multiple functions, because function has + * 64kb code size limit in JVM. This version takes care of INPUT_ROW and currentVars + * + * @param expressions the codes to evaluate expressions. + * @param funcName the split function name base. + * @param argumentsExceptRow the list of (type, name) of the arguments of the split function + * except for ctx.INPUT_ROW + */ + def splitExpressions( + expressions: Seq[String], + funcName: String, + argumentsExceptRow: Seq[(String, String)]): String = { --- End diff -- Let the caller also provides `ctx.INPUT_ROW`? Change it to ```Scala arguments: Seq[(String, String)] ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19752 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84199/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19752 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19821: [WIP][SPARK-22608][SQL] add new API to CodeGenera...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19821#discussion_r153087808 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -785,13 +785,36 @@ class CodegenContext { * @param expressions the codes to evaluate expressions. */ def splitExpressions(row: String, expressions: Seq[String]): String = { --- End diff -- To make it consistent, how about changing it to ```Scala def splitExpressions(row: String, arguments: Seq[(String, String)]): String = { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19752 **[Test build #84199 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84199/testReport)** for PR 19752 at commit [`9063583`](https://github.com/apache/spark/commit/9063583bce77348b9da61abec6e9fb5ae7aef117). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19752 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84198/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19752 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19752: [SPARK-22520][SQL] Support code generation for large Cas...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19752 **[Test build #84198 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84198/testReport)** for PR 19752 at commit [`f9c20be`](https://github.com/apache/spark/commit/f9c20bea19e1e03394a976c90012fc8744267065). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org