[GitHub] spark issue #19318: [SPARK-22096][ML] use aggregateByKeyLocally in feature f...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/19318 @VinceShieh can you please mark this PR's title as "[WIP]"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13585: [SPARK-15859][SQL] Optimize the partition pruning within...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/13585 Oh, yes, I am closing it, will reopen it when we have another idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel closed the pull request at: https://github.com/apache/spark/pull/13585 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/17936 I can understand any code change in Spark core will be hard to review due to the regression concern, I think we can leave the PR for discussion. 1) Actually the `UnsafeCartesianRDD` doesn't aware the block locality and will re-fetch the data from remote even the data has been fetched by another local node task, that's why we have to change some code in `BlockManager`. 2) For some existing application based on RDD, like the `MLLib` still are using the `CartesianRDD`, and we can observe 50x performance boosting in ALS prediction. Previously even we couldn't finish the ALS predication without this optimization until we well tuning lots of things. 3) Repeatable data block iterations probably very useful for new API implementations like Cartesian Product for Machine Learning due to performance concern, unfortunately the `BlockManager` doesn't provide this feature, and we may add some other operations based on this improvement in the future, that's why we think it's important. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17359: [SPARK-20028][SQL] Add aggreagate expression nGrams
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/17359 @rxin nGram is the built-in UDAF in Hive, and some users complaints they faced performance issue when running the queries with nGram. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16245: [SPARK-18824][SQL] Add optimizer rule to reorder ...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/16245#discussion_r97211716 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -514,6 +514,34 @@ case class OptimizeCodegen(conf: CatalystConf) extends Rule[LogicalPlan] { /** + * Reorders the predicates in `Filter` so more expensive expressions like UDF can evaluate later. + */ +object ReorderPredicatesInFilter extends Rule[LogicalPlan] with PredicateHelper { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(pred, child) => + // Extracts deterministic suffix expressions from Filter predicate. + val expressions = splitConjunctivePredicates(pred) + // The beginning index of the deterministic suffix expressions. + var splitIndex = -1 + (expressions.length - 1 to 0 by -1).foreach { idx => +if (splitIndex == -1 && !expressions(idx).deterministic) { + splitIndex = idx + 1 +} + } + if (splitIndex == expressions.length) { +// All expressions are non-deterministic, no reordering. +f + } else { +val (nonDeterminstics, deterministicExprs) = expressions.splitAt(splitIndex) --- End diff -- Hmm, actually that's what I mean, probably some confusing with `non-deterministic` with `non-foldable`? I think we can skip them both in a short cut evaluation. as those expressions are not `stateful`(unfortunately, Spark SQL expression doesn't have the concept of `stateful`), so skip the evaluation of them are harmless, and this is exactly the short cut logic of expression `AND`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16245: [SPARK-18824][SQL] Add optimizer rule to reorder Filter ...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/16245 I think that's true in most of time for`Scala UDF needs extra conversion between internal format and external format on input and out`, not all of the time, for example, some built-in string based operations and its combinations are also quite heavy in evaluation, and most likely, this probably causes concern for an experienced SQL developers, to write an optimal(business related short-cutting logic) SQL expressions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16245: [SPARK-18824][SQL] Add optimizer rule to reorder ...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/16245#discussion_r97211489 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -514,6 +514,34 @@ case class OptimizeCodegen(conf: CatalystConf) extends Rule[LogicalPlan] { /** + * Reorders the predicates in `Filter` so more expensive expressions like UDF can evaluate later. + */ +object ReorderPredicatesInFilter extends Rule[LogicalPlan] with PredicateHelper { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(pred, child) => + // Extracts deterministic suffix expressions from Filter predicate. + val expressions = splitConjunctivePredicates(pred) + // The beginning index of the deterministic suffix expressions. + var splitIndex = -1 + (expressions.length - 1 to 0 by -1).foreach { idx => +if (splitIndex == -1 && !expressions(idx).deterministic) { + splitIndex = idx + 1 +} + } + if (splitIndex == expressions.length) { +// All expressions are non-deterministic, no reordering. +f + } else { +val (nonDeterminstics, deterministicExprs) = expressions.splitAt(splitIndex) --- End diff -- I mean `(rand() > 0) && b)` should equals to `b && (rand() >0)`, and even, the latter probably has better performance, due to the short cut evaluation of `AND`. isn't it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16245: [SPARK-18824][SQL] Add optimizer rule to reorder Filter ...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/16245 Actually I doubt this is really an optimization, as the assumption of Scala UDF is slower than the non-SCALA UDF probably not always true. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16245: [SPARK-18824][SQL] Add optimizer rule to reorder ...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/16245#discussion_r97211330 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -514,6 +514,34 @@ case class OptimizeCodegen(conf: CatalystConf) extends Rule[LogicalPlan] { /** + * Reorders the predicates in `Filter` so more expensive expressions like UDF can evaluate later. + */ +object ReorderPredicatesInFilter extends Rule[LogicalPlan] with PredicateHelper { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(pred, child) => + // Extracts deterministic suffix expressions from Filter predicate. + val expressions = splitConjunctivePredicates(pred) + // The beginning index of the deterministic suffix expressions. + var splitIndex = -1 + (expressions.length - 1 to 0 by -1).foreach { idx => +if (splitIndex == -1 && !expressions(idx).deterministic) { + splitIndex = idx + 1 +} + } + if (splitIndex == expressions.length) { +// All expressions are non-deterministic, no reordering. +f + } else { +val (nonDeterminstics, deterministicExprs) = expressions.splitAt(splitIndex) --- End diff -- I am a little confused why we need to separate the `non-deterministic`? Should be the `stateful` or `foldable`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16476: [SPARK-19084][SQL][WIP] Implement expression field
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/16476 Since the different data type will be simply ignored, I think we'd better also add the optimization rule in `Optimizer`. As well as the python/scala API support, but need to confirm with @rxin, why we don't need the API `field`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16476: [SPARK-19084][SQL][WIP] Implement expression fiel...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/16476#discussion_r95283107 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -1528,6 +1528,18 @@ object functions { def factorial(e: Column): Column = withExpr { Factorial(e.expr) } /** +* Returns the index of str in (str1, str2, ...) list or 0 if not found. +* It takes at least 2 parameters, and all parameters' types should be subtypes of AtomicType. +* +* @group normal_funcs +* @since 2.2.0 +*/ + @scala.annotation.varargs + def field(expr1: Column, expr2: Column, exprs: Column*): Column = withExpr { --- End diff -- @rxin can you explain a little bit why we remove this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16476: [SPARK-19084][SQL][WIP] Implement expression fiel...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/16476#discussion_r95282681 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -340,3 +344,102 @@ object CaseKeyWhen { CaseWhen(cases, elseValue) } } + +/** + * A function that returns the index of expr in (expr1, expr2, ...) list or 0 if not found. + * It takes at least 2 parameters, and all parameters should be subtype of AtomicType or NullType. + * It's also acceptable to give parameters of different types. + * If the search string is NULL, the return value is 0 because NULL fails equality comparison with any value. + * When the paramters have different types, comparing will be done based on type firstly, + * for example, ''999'' won't be considered equal with 999, no implicit cast will be done here. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, expr1, expr2, ...) - Returns the index of expr in the expr1, expr2, ... or 0 if not found.", + extended = """ +Examples: + > SELECT _FUNC_(10, 9, 3, 10, 4); + 3 + > SELECT _FUNC_('a', 'b', 'c', 'd', 'a'); + 4 + > SELECT _FUNC_('999', 'a', 999, 9.99, '999'); + 4 + """) +case class Field(children: Seq[Expression]) extends Expression { + + /** Even if expr is not found in (expr1, expr2, ...) list, the value will be 0, not null */ + override def nullable: Boolean = false + override def foldable: Boolean = children.forall(_.foldable) + + private lazy val ordering = TypeUtils.getInterpretedOrdering(children(0).dataType) + + private val dataTypeMatchIndex: Seq[Int] = children.tail.zip(Stream from 1).filter( +_._1.dataType == children.head.dataType).map(_._2) + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length <= 1) { + TypeCheckResult.TypeCheckFailure(s"FIELD requires at least 2 arguments") +} else if (!children.forall( +e => e.dataType.isInstanceOf[AtomicType] || e.dataType.isInstanceOf[NullType])) { + TypeCheckResult.TypeCheckFailure(s"FIELD requires all arguments to be of AtomicType") +} else + TypeCheckResult.TypeCheckSuccess + } + + override def dataType: DataType = IntegerType + override def eval(input: InternalRow): Any = { +val target = children.head.eval(input) +val targetDataType = children.head.dataType +@tailrec def findEqual(index: Int): Int = { + if (index == dataTypeMatchIndex.size) { +0 + } else { +val value = children(dataTypeMatchIndex(index)).eval(input) +if (value != null && ordering.equiv(target, value)) --- End diff -- use braces. see https://github.com/databricks/scala-style-guide#curly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16476: [SPARK-19084][SQL][WIP] Implement expression fiel...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/16476#discussion_r95282465 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -340,3 +344,102 @@ object CaseKeyWhen { CaseWhen(cases, elseValue) } } + +/** + * A function that returns the index of expr in (expr1, expr2, ...) list or 0 if not found. + * It takes at least 2 parameters, and all parameters should be subtype of AtomicType or NullType. + * It's also acceptable to give parameters of different types. + * If the search string is NULL, the return value is 0 because NULL fails equality comparison with any value. + * When the paramters have different types, comparing will be done based on type firstly, + * for example, ''999'' won't be considered equal with 999, no implicit cast will be done here. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, expr1, expr2, ...) - Returns the index of expr in the expr1, expr2, ... or 0 if not found.", + extended = """ +Examples: + > SELECT _FUNC_(10, 9, 3, 10, 4); + 3 + > SELECT _FUNC_('a', 'b', 'c', 'd', 'a'); + 4 + > SELECT _FUNC_('999', 'a', 999, 9.99, '999'); + 4 + """) +case class Field(children: Seq[Expression]) extends Expression { + + /** Even if expr is not found in (expr1, expr2, ...) list, the value will be 0, not null */ + override def nullable: Boolean = false + override def foldable: Boolean = children.forall(_.foldable) + + private lazy val ordering = TypeUtils.getInterpretedOrdering(children(0).dataType) + + private val dataTypeMatchIndex: Seq[Int] = children.tail.zip(Stream from 1).filter( +_._1.dataType == children.head.dataType).map(_._2) + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length <= 1) { + TypeCheckResult.TypeCheckFailure(s"FIELD requires at least 2 arguments") +} else if (!children.forall( +e => e.dataType.isInstanceOf[AtomicType] || e.dataType.isInstanceOf[NullType])) { + TypeCheckResult.TypeCheckFailure(s"FIELD requires all arguments to be of AtomicType") +} else + TypeCheckResult.TypeCheckSuccess + } + + override def dataType: DataType = IntegerType + override def eval(input: InternalRow): Any = { +val target = children.head.eval(input) +val targetDataType = children.head.dataType +@tailrec def findEqual(index: Int): Int = { + if (index == dataTypeMatchIndex.size) { --- End diff -- if `dataTypeMatchIndex` is `Array[Int]`, then we'd better use `dataTypeMatchIndex.length` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16476: [SPARK-19084][SQL][WIP] Implement expression fiel...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/16476#discussion_r95282270 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -340,3 +344,102 @@ object CaseKeyWhen { CaseWhen(cases, elseValue) } } + +/** + * A function that returns the index of expr in (expr1, expr2, ...) list or 0 if not found. + * It takes at least 2 parameters, and all parameters should be subtype of AtomicType or NullType. + * It's also acceptable to give parameters of different types. + * If the search string is NULL, the return value is 0 because NULL fails equality comparison with any value. + * When the paramters have different types, comparing will be done based on type firstly, + * for example, ''999'' won't be considered equal with 999, no implicit cast will be done here. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, expr1, expr2, ...) - Returns the index of expr in the expr1, expr2, ... or 0 if not found.", + extended = """ +Examples: + > SELECT _FUNC_(10, 9, 3, 10, 4); + 3 + > SELECT _FUNC_('a', 'b', 'c', 'd', 'a'); + 4 + > SELECT _FUNC_('999', 'a', 999, 9.99, '999'); + 4 + """) +case class Field(children: Seq[Expression]) extends Expression { + + /** Even if expr is not found in (expr1, expr2, ...) list, the value will be 0, not null */ + override def nullable: Boolean = false + override def foldable: Boolean = children.forall(_.foldable) + + private lazy val ordering = TypeUtils.getInterpretedOrdering(children(0).dataType) + + private val dataTypeMatchIndex: Seq[Int] = children.tail.zip(Stream from 1).filter( +_._1.dataType == children.head.dataType).map(_._2) + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length <= 1) { + TypeCheckResult.TypeCheckFailure(s"FIELD requires at least 2 arguments") +} else if (!children.forall( +e => e.dataType.isInstanceOf[AtomicType] || e.dataType.isInstanceOf[NullType])) { + TypeCheckResult.TypeCheckFailure(s"FIELD requires all arguments to be of AtomicType") +} else + TypeCheckResult.TypeCheckSuccess + } + + override def dataType: DataType = IntegerType + override def eval(input: InternalRow): Any = { +val target = children.head.eval(input) +val targetDataType = children.head.dataType --- End diff -- Unused code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16476: [SPARK-19084][SQL][WIP] Implement expression fiel...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/16476#discussion_r95281248 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -340,3 +344,102 @@ object CaseKeyWhen { CaseWhen(cases, elseValue) } } + +/** + * A function that returns the index of expr in (expr1, expr2, ...) list or 0 if not found. + * It takes at least 2 parameters, and all parameters should be subtype of AtomicType or NullType. + * It's also acceptable to give parameters of different types. + * If the search string is NULL, the return value is 0 because NULL fails equality comparison with any value. + * When the paramters have different types, comparing will be done based on type firstly, + * for example, ''999'' won't be considered equal with 999, no implicit cast will be done here. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, expr1, expr2, ...) - Returns the index of expr in the expr1, expr2, ... or 0 if not found.", + extended = """ +Examples: + > SELECT _FUNC_(10, 9, 3, 10, 4); + 3 + > SELECT _FUNC_('a', 'b', 'c', 'd', 'a'); + 4 + > SELECT _FUNC_('999', 'a', 999, 9.99, '999'); + 4 + """) +case class Field(children: Seq[Expression]) extends Expression { + + /** Even if expr is not found in (expr1, expr2, ...) list, the value will be 0, not null */ + override def nullable: Boolean = false + override def foldable: Boolean = children.forall(_.foldable) + + private lazy val ordering = TypeUtils.getInterpretedOrdering(children(0).dataType) + + private val dataTypeMatchIndex: Seq[Int] = children.tail.zip(Stream from 1).filter( --- End diff -- `zip(Stream from 1)`, do we really need it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16476: [SPARK-19084][SQL][WIP] Implement expression fiel...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/16476#discussion_r95281159 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -340,3 +344,102 @@ object CaseKeyWhen { CaseWhen(cases, elseValue) } } + +/** + * A function that returns the index of expr in (expr1, expr2, ...) list or 0 if not found. + * It takes at least 2 parameters, and all parameters should be subtype of AtomicType or NullType. + * It's also acceptable to give parameters of different types. + * If the search string is NULL, the return value is 0 because NULL fails equality comparison with any value. + * When the paramters have different types, comparing will be done based on type firstly, + * for example, ''999'' won't be considered equal with 999, no implicit cast will be done here. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, expr1, expr2, ...) - Returns the index of expr in the expr1, expr2, ... or 0 if not found.", + extended = """ +Examples: + > SELECT _FUNC_(10, 9, 3, 10, 4); + 3 + > SELECT _FUNC_('a', 'b', 'c', 'd', 'a'); + 4 + > SELECT _FUNC_('999', 'a', 999, 9.99, '999'); + 4 + """) +case class Field(children: Seq[Expression]) extends Expression { + + /** Even if expr is not found in (expr1, expr2, ...) list, the value will be 0, not null */ + override def nullable: Boolean = false + override def foldable: Boolean = children.forall(_.foldable) + + private lazy val ordering = TypeUtils.getInterpretedOrdering(children(0).dataType) + + private val dataTypeMatchIndex: Seq[Int] = children.tail.zip(Stream from 1).filter( +_._1.dataType == children.head.dataType).map(_._2) --- End diff -- `_._1.dataType.sameType(children.head.dataType)`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16476: [SPARK-19084][SQL][WIP] Implement expression fiel...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/16476#discussion_r95281046 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -340,3 +344,102 @@ object CaseKeyWhen { CaseWhen(cases, elseValue) } } + +/** + * A function that returns the index of expr in (expr1, expr2, ...) list or 0 if not found. + * It takes at least 2 parameters, and all parameters should be subtype of AtomicType or NullType. + * It's also acceptable to give parameters of different types. + * If the search string is NULL, the return value is 0 because NULL fails equality comparison with any value. + * When the paramters have different types, comparing will be done based on type firstly, + * for example, ''999'' won't be considered equal with 999, no implicit cast will be done here. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, expr1, expr2, ...) - Returns the index of expr in the expr1, expr2, ... or 0 if not found.", + extended = """ +Examples: + > SELECT _FUNC_(10, 9, 3, 10, 4); + 3 + > SELECT _FUNC_('a', 'b', 'c', 'd', 'a'); + 4 + > SELECT _FUNC_('999', 'a', 999, 9.99, '999'); + 4 + """) +case class Field(children: Seq[Expression]) extends Expression { + + /** Even if expr is not found in (expr1, expr2, ...) list, the value will be 0, not null */ + override def nullable: Boolean = false + override def foldable: Boolean = children.forall(_.foldable) + + private lazy val ordering = TypeUtils.getInterpretedOrdering(children(0).dataType) + + private val dataTypeMatchIndex: Seq[Int] = children.tail.zip(Stream from 1).filter( --- End diff -- Array[Int] instead? Seq[Int] probably a LinkedList in its concrete implementation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16476: [SPARK-19084][SQL] Implement expression field
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/16476 @gczsjdy can you please add [WIP] in the title, until you feel the code is ready for review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16476: [SPARK-19084][SQL] Implement expression field
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/16476#discussion_r95080769 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -340,3 +341,91 @@ object CaseKeyWhen { CaseWhen(cases, elseValue) } } + +/** + * A function that returns the index of str in (str1, str2, ...) list or 0 if not found. + * It takes at least 2 parameters, and all parameters' types should be subtypes of AtomicType. + */ +@ExpressionDescription( + usage = "_FUNC_(str, str1, str2, ...) - Returns the index of str in the str1,str2,... or 0 if not found.", + extended = """ +Examples: + > SELECT _FUNC_(10, 9, 3, 10, 4); + 3 + """) +case class Field(children: Seq[Expression]) extends Expression { + + override def nullable: Boolean = false + override def foldable: Boolean = children.forall(_.foldable) + + private lazy val ordering = TypeUtils.getInterpretedOrdering(children(0).dataType) + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length <= 1) { + TypeCheckResult.TypeCheckFailure(s"FIELD requires at least 2 arguments") +} else if (!children.forall(_.dataType.isInstanceOf[AtomicType])) { + TypeCheckResult.TypeCheckFailure(s"FIELD requires all arguments to be of AtomicType") +} else + TypeCheckResult.TypeCheckSuccess + } + + override def dataType: DataType = IntegerType + + override def eval(input: InternalRow): Any = { +val target = children.head.eval(input) +val targetDataType = children.head.dataType +def findEqual(target: Any, params: Seq[Expression], index: Int): Int = { + params.toList match { --- End diff -- `toList` probably causes performance overhead, I don't think we have to sacrifice the performance for using the pattern match. In the meantime, I still believe we don't have to check the data type during the runtime. It's supposed to be done during the `compile` time or only done once for the first time in `eval`. The `Field` evaluation is quite confusing, as @gatorsmile suggested, we need to describe how to evaluate the value when sub expressions' data type are different. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16476: [SPARK-19084][SQL] Implement expression field
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/16476#discussion_r95080582 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -340,3 +341,91 @@ object CaseKeyWhen { CaseWhen(cases, elseValue) } } + +/** + * A function that returns the index of str in (str1, str2, ...) list or 0 if not found. + * It takes at least 2 parameters, and all parameters' types should be subtypes of AtomicType. + */ +@ExpressionDescription( + usage = "_FUNC_(str, str1, str2, ...) - Returns the index of str in the str1,str2,... or 0 if not found.", + extended = """ +Examples: + > SELECT _FUNC_(10, 9, 3, 10, 4); + 3 + """) +case class Field(children: Seq[Expression]) extends Expression { + + override def nullable: Boolean = false + override def foldable: Boolean = children.forall(_.foldable) + + private lazy val ordering = TypeUtils.getInterpretedOrdering(children(0).dataType) + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length <= 1) { + TypeCheckResult.TypeCheckFailure(s"FIELD requires at least 2 arguments") +} else if (!children.forall(_.dataType.isInstanceOf[AtomicType])) { + TypeCheckResult.TypeCheckFailure(s"FIELD requires all arguments to be of AtomicType") +} else + TypeCheckResult.TypeCheckSuccess + } + + override def dataType: DataType = IntegerType + + override def eval(input: InternalRow): Any = { +val target = children.head.eval(input) +val targetDataType = children.head.dataType +def findEqual(target: Any, params: Seq[Expression], index: Int): Int = { + params.toList match { --- End diff -- you can add the annotation `@tailrec` for explicitly declare that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15579: Added support for extra command in front of spark.
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/15579 Oh, thank you @jerryshao , just noticed you gave inputs also. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15579: Added support for extra command in front of spark.
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/15579 @srowen Besides `numactl`, some profiling tools like the `valgrind`, `strace`, `vtune`, and also the system call hackings we probably needed before the executor process launched. I'll agree this probably not that secure to provide the prefixed command via configuration, but an earlier code review if community will accept that; @rxin And a follow up commits with unit tests, standardalone and mesos mode support will be added soon. @sheepduke please update the PR description, title first. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15361: [SPARK-17765][SQL] Support for writing out user-defined ...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/15361 yes, please go ahead. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks from di...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/10225#discussion_r77748327 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -136,7 +136,9 @@ private[spark] class IndexShuffleBlockResolver( shuffleId: Int, mapId: Int, lengths: Array[Long], - dataTmp: File): Unit = { --- End diff -- Do we have to change the code in this function? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14366: [SPARK-16732][SQL] Remove unused codes in subexpressionE...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/14366 Ping @rxin , seems the upstream is not updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12646: [SPARK-14878][SQL] Trim characters string function suppo...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/12646 I like this PR since it's part of SQL standard, but there are also another Jira, https://issues.apache.org/jira/browse/SPARK-17299 , maybe we can do that in a follow up PR to fix. Can you @kevinyu98 ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r76966164 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2677,4 +2678,107 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { data.selectExpr("`part.col1`", "`col.1`")) } } + + test("TRIM function-BOTH") { +val ae1 = intercept[AnalysisException]{ + sql("SELECT TRIM(BOTH 'aa' FROM 'aabcaa')").collect() --- End diff -- Sorry, ignore this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r76966028 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2677,4 +2678,107 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { data.selectExpr("`part.col1`", "`col.1`")) } } + + test("TRIM function-BOTH") { +val ae1 = intercept[AnalysisException]{ + sql("SELECT TRIM(BOTH 'aa' FROM 'aabcaa')").collect() --- End diff -- Nit: Do we really need this test? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r76965552 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -476,6 +476,61 @@ public UTF8String trim() { } } + /** + * Removes all specified trim character either from the beginning or the ending of a string + * @param trimChar the trim character + */ + public UTF8String trim (UTF8String trimChar) { --- End diff -- no space after `trim` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r76965110 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala --- @@ -1789,6 +1803,133 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("TRIM function-BOTH") { +withTable("trimBoth", "trimStrut") { + sql("create table trimBoth (c1 string, c2 char(1), c3 string, c4 string, " + +"c5 string, c6 string)") + // scalastyle:off + sql("insert into trimBoth select 'cc', 'c', ' cccbacc', 'cccbaccæ°', 'æ°æ®ç 头', 'æ°'") + // scalastyle:on + sql("create table trimStrut (c1 struct, c2 string)") + sql("insert into trimStrut values ((100, 'abc'), 'ABC')") + + intercept[AnalysisException] { +sql("SELECT TRIM('c', C1, 'd') from trimBoth") + } + intercept[AnalysisException] { + sql("SELECT TRIM('cc', C1) from trimBoth").collect + } + intercept[AnalysisException] { + sql("SELECT TRIM(C2, C1) from trimBoth").collect + } + intercept[AnalysisException] { + sql("SELECT TRIM(BOTH C2 FROM C1) from trimBoth").collect + } + intercept[AnalysisException] { +sql("select trim(c1,c2) from trimStrut") + } + intercept[ --- End diff -- typo with `enter` key? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r76963822 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -431,56 +432,233 @@ case class FindInSet(left: Expression, right: Expression) extends BinaryExpressi } /** - * A function that trim the spaces from both ends for the specified string. + * A function that trim the spaces or a character from both ends for the specified string. */ @ExpressionDescription( - usage = "_FUNC_(str) - Removes the leading and trailing space characters from str.", - extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL'") -case class StringTrim(child: Expression) - extends UnaryExpression with String2StringExpression { + usage = "_FUNC_(str) - Removes the leading and trailing space characters or char from str.", + extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL'\n" + + "> SELECT _FUNC_('S', 'SSparkSQLS');\n 'parkSQL'\n" + + "> SELECT _FUNC_(BOTH 'S' FROM 'SSparkSQLS');\n 'parkSQL'\n" + + "> SELECT _FUNC_(LEADING 'S' FROM 'SSparkSQLS');\n 'parkSQLS'\n" + + "> SELECT _FUNC_(TRAILING 'S' FROM 'SSparkSQLS');\n 'SSparkSQL'") +case class StringTrim(children: Seq[Expression]) + extends Expression with ImplicitCastInputTypes { + + require (children.size <= 2 && children.nonEmpty, +"$prettyName requires at least one argument and no more than two.") + + override def dataType: DataType = StringType + override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringType) - def convert(v: UTF8String): UTF8String = v.trim() + override def nullable: Boolean = children.exists(_.nullable) + override def foldable: Boolean = children.forall(_.foldable) override def prettyName: String = "trim" + override def eval(input: InternalRow): Any = { +val inputs = children.map(_.eval(input).asInstanceOf[UTF8String]) +if (inputs(0) != null) { + if (children.size == 1) { +return inputs(0).trim() + } else if (inputs(1) != null) { +if (inputs(0).numChars > 1) { + throw new AnalysisException(s"Trim character '${inputs(0)}' can not be greater than " + +s"1 character.") +} else { + return inputs(1).trim(inputs(0)) +} + } +} +null + } + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -defineCodeGen(ctx, ev, c => s"($c).trim()") +if (children.size == 2 && + (! children(0).isInstanceOf[Literal] || children(0).toString.length > 1)) { + throw new AnalysisException(s"The trimming parameter should be Literal " + +s"and only one character.") } + +val evals = children.map(_.genCode(ctx)) +val inputs = evals.map { eval => + s"${eval.isNull} ? null : ${eval.value}" +} +val getTrimFunction = if (children.size == 1) { + s"""UTF8String ${ev.value} = ${inputs(0)}.trim();""" +} else { + s"""UTF8String ${ev.value} = ${inputs(1)}.trim(${inputs(0)});""".stripMargin +} +ev.copy(evals.map(_.code).mkString("\n") + +s""" +boolean ${ev.isNull} = false; +${getTrimFunction}; +if (${ev.value} == null) { + ${ev.isNull} = true; +} +""") + } + + override def sql: String = { +if (children.size == 1) { + val childrenSQL = children.map(_.sql).mkString(", ") + s"$prettyName($childrenSQL)" +} else { + val trimSQL = children(0).map(_.sql).mkString(", ") + val tarSQL = children(1).map(_.sql).mkString(", ") + s"$prettyName($trimSQL, $tarSQL)" +} } } /** - * A function that trim the spaces from left end for given string. + * A function that trim the spaces or a character from left end for given string. */ @ExpressionDescription( usage = "_FUNC_(str) - Removes the leading space characters from str.", - extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL '") -case class StringTrimLeft(child: Expression) - extends UnaryExpression with String2StringExpression { + extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL '\n" +
[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r76963598 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -431,56 +432,233 @@ case class FindInSet(left: Expression, right: Expression) extends BinaryExpressi } /** - * A function that trim the spaces from both ends for the specified string. + * A function that trim the spaces or a character from both ends for the specified string. */ @ExpressionDescription( - usage = "_FUNC_(str) - Removes the leading and trailing space characters from str.", - extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL'") -case class StringTrim(child: Expression) - extends UnaryExpression with String2StringExpression { + usage = "_FUNC_(str) - Removes the leading and trailing space characters or char from str.", + extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL'\n" + + "> SELECT _FUNC_('S', 'SSparkSQLS');\n 'parkSQL'\n" + + "> SELECT _FUNC_(BOTH 'S' FROM 'SSparkSQLS');\n 'parkSQL'\n" + + "> SELECT _FUNC_(LEADING 'S' FROM 'SSparkSQLS');\n 'parkSQLS'\n" + + "> SELECT _FUNC_(TRAILING 'S' FROM 'SSparkSQLS');\n 'SSparkSQL'") +case class StringTrim(children: Seq[Expression]) + extends Expression with ImplicitCastInputTypes { + + require (children.size <= 2 && children.nonEmpty, +"$prettyName requires at least one argument and no more than two.") + + override def dataType: DataType = StringType + override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringType) - def convert(v: UTF8String): UTF8String = v.trim() + override def nullable: Boolean = children.exists(_.nullable) + override def foldable: Boolean = children.forall(_.foldable) override def prettyName: String = "trim" + override def eval(input: InternalRow): Any = { +val inputs = children.map(_.eval(input).asInstanceOf[UTF8String]) +if (inputs(0) != null) { + if (children.size == 1) { +return inputs(0).trim() + } else if (inputs(1) != null) { +if (inputs(0).numChars > 1) { + throw new AnalysisException(s"Trim character '${inputs(0)}' can not be greater than " + +s"1 character.") +} else { + return inputs(1).trim(inputs(0)) +} + } +} +null + } + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -defineCodeGen(ctx, ev, c => s"($c).trim()") +if (children.size == 2 && + (! children(0).isInstanceOf[Literal] || children(0).toString.length > 1)) { + throw new AnalysisException(s"The trimming parameter should be Literal " + +s"and only one character.") } + +val evals = children.map(_.genCode(ctx)) +val inputs = evals.map { eval => + s"${eval.isNull} ? null : ${eval.value}" +} +val getTrimFunction = if (children.size == 1) { + s"""UTF8String ${ev.value} = ${inputs(0)}.trim();""" +} else { + s"""UTF8String ${ev.value} = ${inputs(1)}.trim(${inputs(0)});""".stripMargin +} +ev.copy(evals.map(_.code).mkString("\n") + +s""" +boolean ${ev.isNull} = false; +${getTrimFunction}; +if (${ev.value} == null) { + ${ev.isNull} = true; +} +""") + } + + override def sql: String = { +if (children.size == 1) { + val childrenSQL = children.map(_.sql).mkString(", ") + s"$prettyName($childrenSQL)" +} else { + val trimSQL = children(0).map(_.sql).mkString(", ") + val tarSQL = children(1).map(_.sql).mkString(", ") + s"$prettyName($trimSQL, $tarSQL)" +} } } /** - * A function that trim the spaces from left end for given string. + * A function that trim the spaces or a character from left end for given string. */ @ExpressionDescription( usage = "_FUNC_(str) - Removes the leading space characters from str.", - extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL '") -case class StringTrimLeft(child: Expression) - extends UnaryExpression with String2StringExpression { + extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL '\n" +
[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r76963573 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -431,56 +432,233 @@ case class FindInSet(left: Expression, right: Expression) extends BinaryExpressi } /** - * A function that trim the spaces from both ends for the specified string. + * A function that trim the spaces or a character from both ends for the specified string. */ @ExpressionDescription( - usage = "_FUNC_(str) - Removes the leading and trailing space characters from str.", - extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL'") -case class StringTrim(child: Expression) - extends UnaryExpression with String2StringExpression { + usage = "_FUNC_(str) - Removes the leading and trailing space characters or char from str.", + extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL'\n" + + "> SELECT _FUNC_('S', 'SSparkSQLS');\n 'parkSQL'\n" + + "> SELECT _FUNC_(BOTH 'S' FROM 'SSparkSQLS');\n 'parkSQL'\n" + + "> SELECT _FUNC_(LEADING 'S' FROM 'SSparkSQLS');\n 'parkSQLS'\n" + + "> SELECT _FUNC_(TRAILING 'S' FROM 'SSparkSQLS');\n 'SSparkSQL'") +case class StringTrim(children: Seq[Expression]) + extends Expression with ImplicitCastInputTypes { + + require (children.size <= 2 && children.nonEmpty, +"$prettyName requires at least one argument and no more than two.") + + override def dataType: DataType = StringType + override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringType) - def convert(v: UTF8String): UTF8String = v.trim() + override def nullable: Boolean = children.exists(_.nullable) + override def foldable: Boolean = children.forall(_.foldable) override def prettyName: String = "trim" + override def eval(input: InternalRow): Any = { +val inputs = children.map(_.eval(input).asInstanceOf[UTF8String]) +if (inputs(0) != null) { + if (children.size == 1) { +return inputs(0).trim() + } else if (inputs(1) != null) { +if (inputs(0).numChars > 1) { + throw new AnalysisException(s"Trim character '${inputs(0)}' can not be greater than " + +s"1 character.") +} else { + return inputs(1).trim(inputs(0)) +} + } +} +null + } + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -defineCodeGen(ctx, ev, c => s"($c).trim()") +if (children.size == 2 && + (! children(0).isInstanceOf[Literal] || children(0).toString.length > 1)) { + throw new AnalysisException(s"The trimming parameter should be Literal " + +s"and only one character.") } + +val evals = children.map(_.genCode(ctx)) +val inputs = evals.map { eval => + s"${eval.isNull} ? null : ${eval.value}" +} +val getTrimFunction = if (children.size == 1) { + s"""UTF8String ${ev.value} = ${inputs(0)}.trim();""" +} else { + s"""UTF8String ${ev.value} = ${inputs(1)}.trim(${inputs(0)});""".stripMargin +} +ev.copy(evals.map(_.code).mkString("\n") + +s""" +boolean ${ev.isNull} = false; +${getTrimFunction}; +if (${ev.value} == null) { + ${ev.isNull} = true; +} +""") + } + + override def sql: String = { +if (children.size == 1) { + val childrenSQL = children.map(_.sql).mkString(", ") + s"$prettyName($childrenSQL)" +} else { + val trimSQL = children(0).map(_.sql).mkString(", ") + val tarSQL = children(1).map(_.sql).mkString(", ") + s"$prettyName($trimSQL, $tarSQL)" +} } } /** - * A function that trim the spaces from left end for given string. + * A function that trim the spaces or a character from left end for given string. */ @ExpressionDescription( usage = "_FUNC_(str) - Removes the leading space characters from str.", - extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL '") -case class StringTrimLeft(child: Expression) - extends UnaryExpression with String2StringExpression { + extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL '\n" +
[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r76963406 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -431,56 +432,233 @@ case class FindInSet(left: Expression, right: Expression) extends BinaryExpressi } /** - * A function that trim the spaces from both ends for the specified string. + * A function that trim the spaces or a character from both ends for the specified string. */ @ExpressionDescription( - usage = "_FUNC_(str) - Removes the leading and trailing space characters from str.", - extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL'") -case class StringTrim(child: Expression) - extends UnaryExpression with String2StringExpression { + usage = "_FUNC_(str) - Removes the leading and trailing space characters or char from str.", + extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL'\n" + + "> SELECT _FUNC_('S', 'SSparkSQLS');\n 'parkSQL'\n" + + "> SELECT _FUNC_(BOTH 'S' FROM 'SSparkSQLS');\n 'parkSQL'\n" + + "> SELECT _FUNC_(LEADING 'S' FROM 'SSparkSQLS');\n 'parkSQLS'\n" + + "> SELECT _FUNC_(TRAILING 'S' FROM 'SSparkSQLS');\n 'SSparkSQL'") +case class StringTrim(children: Seq[Expression]) + extends Expression with ImplicitCastInputTypes { + + require (children.size <= 2 && children.nonEmpty, +"$prettyName requires at least one argument and no more than two.") + + override def dataType: DataType = StringType + override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringType) - def convert(v: UTF8String): UTF8String = v.trim() + override def nullable: Boolean = children.exists(_.nullable) + override def foldable: Boolean = children.forall(_.foldable) override def prettyName: String = "trim" + override def eval(input: InternalRow): Any = { +val inputs = children.map(_.eval(input).asInstanceOf[UTF8String]) +if (inputs(0) != null) { + if (children.size == 1) { +return inputs(0).trim() + } else if (inputs(1) != null) { +if (inputs(0).numChars > 1) { + throw new AnalysisException(s"Trim character '${inputs(0)}' can not be greater than " + +s"1 character.") +} else { + return inputs(1).trim(inputs(0)) +} + } +} +null + } + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -defineCodeGen(ctx, ev, c => s"($c).trim()") +if (children.size == 2 && + (! children(0).isInstanceOf[Literal] || children(0).toString.length > 1)) { + throw new AnalysisException(s"The trimming parameter should be Literal " + +s"and only one character.") } + +val evals = children.map(_.genCode(ctx)) +val inputs = evals.map { eval => + s"${eval.isNull} ? null : ${eval.value}" +} +val getTrimFunction = if (children.size == 1) { + s"""UTF8String ${ev.value} = ${inputs(0)}.trim();""" +} else { + s"""UTF8String ${ev.value} = ${inputs(1)}.trim(${inputs(0)});""".stripMargin +} +ev.copy(evals.map(_.code).mkString("\n") + +s""" +boolean ${ev.isNull} = false; +${getTrimFunction}; +if (${ev.value} == null) { + ${ev.isNull} = true; +} +""") + } + + override def sql: String = { +if (children.size == 1) { + val childrenSQL = children.map(_.sql).mkString(", ") + s"$prettyName($childrenSQL)" +} else { + val trimSQL = children(0).map(_.sql).mkString(", ") + val tarSQL = children(1).map(_.sql).mkString(", ") + s"$prettyName($trimSQL, $tarSQL)" +} } } /** - * A function that trim the spaces from left end for given string. + * A function that trim the spaces or a character from left end for given string. */ @ExpressionDescription( usage = "_FUNC_(str) - Removes the leading space characters from str.", - extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL '") -case class StringTrimLeft(child: Expression) - extends UnaryExpression with String2StringExpression { + extended = "> SELECT _FUNC_('SparkSQL ');\n 'SparkSQL '\n" +
[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r76962244 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -501,6 +578,38 @@ public UTF8String trimRight() { } } + /** + * Removes all specified trim character from the ending of a string + * @param trimChar the trim character + */ + public UTF8String trimRight(UTF8String trimChar) { +int numTrimBytes = trimChar.numBytes; +if (numTrimBytes == 0) { + return this; +} +int e = this.numBytes - numTrimBytes; +// skip all the consecutive matching character in the right side +// index 'e' points to first no matching byte position in the input string from right side. +// Index 'e' moves the number of bytes of the trimming character first. +if (e < 0) { + e = this.numBytes - 1; +} else { + while (e >= 0 && e == this.rfind(trimChar, e)) { +e -= numTrimBytes; + } + if (e >= 0) { +e += numTrimBytes - 1; + } +} + +if (e < 0) { + // empty string + return UTF8String.fromBytes(new byte[0]); --- End diff -- `UTF8String.EMPTY_UTF8` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r76962088 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -488,6 +543,28 @@ public UTF8String trimLeft() { } } + /** + * Removes all specified trim character from the beginning of a string + * @param trimChar the trim character + */ + public UTF8String trimLeft(UTF8String trimChar) { +int numTrimBytes = trimChar.numBytes; +if (numTrimBytes == 0) { + return this; +} +int s = 0; +// skip all the consecutive matching character in the left side +while(s < this.numBytes && s == this.find(trimChar, s)) { + s += numTrimBytes; +} +if (s == this.numBytes) { + // empty string + return UTF8String.fromBytes(new byte[0]); --- End diff -- `UTF8String.EMPTY_UTF8` instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r76961869 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -488,6 +543,28 @@ public UTF8String trimLeft() { } } + /** + * Removes all specified trim character from the beginning of a string + * @param trimChar the trim character + */ + public UTF8String trimLeft(UTF8String trimChar) { +int numTrimBytes = trimChar.numBytes; +if (numTrimBytes == 0) { + return this; +} +int s = 0; +// skip all the consecutive matching character in the left side +while(s < this.numBytes && s == this.find(trimChar, s)) { --- End diff -- Oh, sorry, my bad, after checking the `find` function, it's should not be the problem here, please ignore my previous comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/12646#discussion_r76961503 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -488,6 +543,28 @@ public UTF8String trimLeft() { } } + /** + * Removes all specified trim character from the beginning of a string + * @param trimChar the trim character + */ + public UTF8String trimLeft(UTF8String trimChar) { +int numTrimBytes = trimChar.numBytes; +if (numTrimBytes == 0) { + return this; +} +int s = 0; +// skip all the consecutive matching character in the left side +while(s < this.numBytes && s == this.find(trimChar, s)) { --- End diff -- `this.find(trimChar, s)` in a loop probably causes performance issue. I will suggest to re-implement it other than call the existing function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14481: [WIP][SPARK-16844][SQL] Generate code for sort based agg...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/14481 @yucai can you please rebase the code? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r72184495 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression from the given expression, to optimize the + * partition pruning. For instances: (We assume part1 & part2 are the partition keys): + * (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2) + * (part1 == 1 and a > 3) or (a < 100) => None + * (a > 100 && b < 100) or (part1 = 10) => None + * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2) + * @param predicate The given expression + * @param partitionKeyIds partition keys in attribute set + * @return + */ + def extractPartitionKeyExpression( +predicate: Expression, partitionKeyIds: AttributeSet): Option[Expression] = { +// drop the non-partition key expression in conjunction of the expression tree +val additionalPartPredicate = predicate transformUp { --- End diff -- I can keep updating the code if we are agreed for approach, otherwise, I think we'd better close this PR for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r72184424 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression from the given expression, to optimize the + * partition pruning. For instances: (We assume part1 & part2 are the partition keys): + * (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2) + * (part1 == 1 and a > 3) or (a < 100) => None + * (a > 100 && b < 100) or (part1 = 10) => None + * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2) + * @param predicate The given expression + * @param partitionKeyIds partition keys in attribute set + * @return + */ + def extractPartitionKeyExpression( +predicate: Expression, partitionKeyIds: AttributeSet): Option[Expression] = { +// drop the non-partition key expression in conjunction of the expression tree +val additionalPartPredicate = predicate transformUp { --- End diff -- This PR may have critical bugs, when user implements a UDF which logically like the `NOT` operator in the partition filter expression. Probably we need a white list the built-in UDFs. @yhuai @liancheng @yangw1234 @clockfly any comments on this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14169: [SPARK-16515][SQL]set default record reader and w...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/14169#discussion_r71085323 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -1329,7 +1329,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // SPARK-10310: Special cases LazySimpleSerDe val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { - Try(conf.getConfString(configKey)).toOption --- End diff -- The default value is different for different `key`, you mean to inline the `defaultRecordHandler` function? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14169: [SPARK-16515][SQL]set default record reader and writer f...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/14169 HiveConf provides default value `org.apache.hadoop.hive.ql.exec.TextRecordReader`, `org.apache.hadoop.hive.ql.exec.TextRecordWriter` for keys `hive.script.recordreader` and `hive.script.recordwriter` respectively; however, SQLConf doesn't provides those keys, and it means the default values will be null; this causes the backward-incompatibility; --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14169: [SPARK-16515][SQL]set default record reader and writer f...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/14169 LGTM. cc @yhuai @liancheng This breaks the existed application which using the default delimiter, and we've already verified in TPCx-BB. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13542: [SPARK-15730][SQL] Respect the --hiveconf in the spark-s...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/13542 @yhuai I couldn't find any piece of code to copy the `HiveConf`(from SessionState) to `SqlConf`? Can you confirm this? Probably that's the reason why --hiveconf doesn't work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13542: [SPARK-15730][SQL] Respect the --hiveconf in the spark-s...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/13542 Thanks @jameszhouyi , I've removed the `WIP` from the title. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13542: [SPARK-15730][SQL] Respect the --hiveconf in the ...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13542#discussion_r67083155 --- Diff: sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala --- @@ -91,6 +91,8 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath | --hiveconf ${ConfVars.SCRATCHDIR}=$scratchDirPath + | --hiveconf conf1=conftest + | --hiveconf conf2=1 --- End diff -- @yhuai any concern for this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66743318 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -65,15 +65,20 @@ private[hive] trait HiveStrategies { // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionKeys) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => - !predicate.references.isEmpty && + predicate.references.nonEmpty && predicate.references.subsetOf(partitionKeyIds) } +val additionalPartPredicates = + PhysicalOperation.partitionPrunningFromDisjunction( +otherPredicates.foldLeft[Expression](Literal(true))(And(_, _)), partitionKeyIds) pruneFilterProject( projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil +HiveTableScanExec(_, +relation, +pruningPredicates ++ additionalPartPredicates)(sparkSession)) :: Nil --- End diff -- Thanks @clockfly to point the exception also. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66743131 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -65,15 +65,20 @@ private[hive] trait HiveStrategies { // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionKeys) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => - !predicate.references.isEmpty && + predicate.references.nonEmpty && predicate.references.subsetOf(partitionKeyIds) } +val additionalPartPredicates = + PhysicalOperation.partitionPrunningFromDisjunction( +otherPredicates.foldLeft[Expression](Literal(true))(And(_, _)), partitionKeyIds) pruneFilterProject( projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil +HiveTableScanExec(_, +relation, +pruningPredicates ++ additionalPartPredicates)(sparkSession)) :: Nil --- End diff -- Thanks @yangw1234 , I will update the code to be more strict for the partition pruning filter extraction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66742892 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression in the disjunctions, to optimize the partition pruning. + * For instances: (We assume part1 & part2 are the partition keys) + * (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2) + * (part1 == 1 and a > 3) or (a < 100) => None + * (a > 100 && b < 100) or (part1 = 10) => None + * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2) + * @param predicate disjunctions + * @param partitionKeyIds partition keys in attribute set + * @return + */ + def partitionPrunningFromDisjunction( +predicate: Expression, partitionKeyIds: AttributeSet): Option[Expression] = { +// ignore the pure non-partition key expression in conjunction of the expression tree +val additionalPartPredicate = predicate transformUp { + case a @ And(left, right) if a.deterministic && +left.references.intersect(partitionKeyIds).isEmpty => right + case a @ And(left, right) if a.deterministic && +right.references.intersect(partitionKeyIds).isEmpty => left --- End diff -- OK, I got it, thanks for the explanation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66741771 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -65,15 +65,20 @@ private[hive] trait HiveStrategies { // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionKeys) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => - !predicate.references.isEmpty && + predicate.references.nonEmpty && predicate.references.subsetOf(partitionKeyIds) } +val additionalPartPredicates = + PhysicalOperation.partitionPrunningFromDisjunction( +otherPredicates.foldLeft[Expression](Literal(true))(And(_, _)), partitionKeyIds) pruneFilterProject( projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil +HiveTableScanExec(_, +relation, +pruningPredicates ++ additionalPartPredicates)(sparkSession)) :: Nil --- End diff -- Sorry, @clockfly I am not so sure your mean, this PR is not designed to depends on the Optimizer (CNF), can you please give more concrete example if there is a bug? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66733226 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -65,15 +65,20 @@ private[hive] trait HiveStrategies { // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionKeys) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => - !predicate.references.isEmpty && + predicate.references.nonEmpty && predicate.references.subsetOf(partitionKeyIds) } +val additionalPartPredicates = + PhysicalOperation.partitionPrunningFromDisjunction( +otherPredicates.foldLeft[Expression](Literal(true))(And(_, _)), partitionKeyIds) pruneFilterProject( projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil +HiveTableScanExec(_, +relation, +pruningPredicates ++ additionalPartPredicates)(sparkSession)) :: Nil --- End diff -- @yangw1234 @liancheng @clockfly `pruningPredicates ++ additionalPartPredicates` is the partition filter, and, the original filter still need to be applied after the partition pruned. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13585: [SPARK-15859][SQL] Optimize the partition pruning within...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/13585 Updated with more meaningful function name and add more unit test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13585: [SPARK-15859][SQL] Optimize the partition pruning within...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/13585 cc @liancheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13542: [SPARK-15730][SQL][WIP] Respect the --hiveconf in...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13542#discussion_r66731077 --- Diff: sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala --- @@ -91,6 +91,8 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath | --hiveconf ${ConfVars.SCRATCHDIR}=$scratchDirPath + | --hiveconf conf1=conftest + | --hiveconf conf2=1 --- End diff -- yes, it works, that's intention, right? But seems the below code in `SparkSQLCliDriver` will not work as we expected. ```scala if (key != "javax.jdo.option.ConnectionURL") { conf.set(key, value) sessionState.getOverriddenConfigurations.put(key, value) } ``` Why do we have to ignore the connection url? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13530: [SPARK-14279][BUILD] Pick the spark version from pom
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/13530 `spark-version-info.properties` cannot be found in my develop machine, and will cause NPE while debugging with IDE, should we add the default version info back for developers like me? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66714698 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression in the disjunctions, to optimize the partition pruning. --- End diff -- Oh, OK, originally, I think the conjunction cases was handled in `collectProjectsAndFilters` already, before being passed into this function, and here, we only handle the `AND` in the disjunction. (You can see this in HiveTableScans in HiveStrategies.scala) Anyway, you convinced me. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66714358 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -65,15 +65,20 @@ private[hive] trait HiveStrategies { // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionKeys) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => - !predicate.references.isEmpty && + predicate.references.nonEmpty && predicate.references.subsetOf(partitionKeyIds) } +val additionalPartPredicates = + PhysicalOperation.partitionPrunningFromDisjunction( +otherPredicates.foldLeft[Expression](Literal(true))(And(_, _)), partitionKeyIds) pruneFilterProject( projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil +HiveTableScanExec(_, +relation, +pruningPredicates ++ additionalPartPredicates)(sparkSession)) :: Nil --- End diff -- For `HiveTableScan`, the predicate here just to minimize the partition scanning, so what we need to do is to put a more specific partition pruning predicate. Sorry if there is something confused. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66714324 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression in the disjunctions, to optimize the partition pruning. --- End diff -- I think it's should be `disjunction`. for example: `(part=1 and a=1) or (part = 2 and a=4)`, this should be disjunction, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66714314 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala --- @@ -65,4 +69,95 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl sql("DROP TABLE IF EXISTS createAndInsertTest") } } + + test("partition pruning in disjunction") { +withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "true")) { + val testData = sparkContext.parallelize( +(1 to 10).map(i => TestData(i, i.toString))).toDF() + testData.registerTempTable("testData") + + val testData2 = sparkContext.parallelize( +(11 to 20).map(i => TestData(i, i.toString))).toDF() + testData2.registerTempTable("testData2") + + val testData3 = sparkContext.parallelize( +(21 to 30).map(i => TestData(i, i.toString))).toDF() + testData3.registerTempTable("testData3") + + val testData4 = sparkContext.parallelize( +(31 to 40).map(i => TestData(i, i.toString))).toDF() + testData4.registerTempTable("testData4") + + val tmpDir = Files.createTempDir() + // create the table for test + sql(s"CREATE TABLE table_with_partition(key int,value string) " + +s"PARTITIONED by (ds string, ds2 string) location '${tmpDir.toURI.toString}' ") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1', ds2='d1') " + +"SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2', ds2='d1') " + +"SELECT key,value FROM testData2") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3', ds2='d3') " + +"SELECT key,value FROM testData3") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4', ds2='d4') " + +"SELECT key,value FROM testData4") + + checkAnswer(sql("select key,value from table_with_partition"), +testData.collect ++ testData2.collect ++ testData3.collect ++ testData4.collect) + + checkAnswer( +sql( + """select key,value from table_with_partition +| where (ds='4' and key=38) or (ds='3' and key=22)""".stripMargin), + Row(38, "38") :: Row(22, "22") :: Nil) + + checkAnswer( +sql( + """select key,value from table_with_partition +| where (key<40 and key>38) or (ds='3' and key=22)""".stripMargin), +Row(39, "39") :: Row(22, "22") :: Nil) + + sql("DROP TABLE table_with_partition") + sql("DROP TABLE createAndInsertTest") --- End diff -- good catch. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13585: [SPARK-15859][SQL] Optimize the partition pruning within...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/13585 Thank you all for the review, but I am not going to solve the CNF, the intention of this PR is to exact more partition pruning expression, so we will get have less partition to scan during the table scanning. But I did find some bug in this PR, will add more unit test soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66714297 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression in the disjunctions, to optimize the partition pruning. + * For instances: (We assume part1 & part2 are the partition keys) + * (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2) + * (part1 == 1 and a > 3) or (a < 100) => None + * (a > 100 && b < 100) or (part1 = 10) => None + * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2) + * @param predicate disjunctions + * @param partitionKeyIds partition keys in attribute set + * @return + */ + def partitionPrunningFromDisjunction( +predicate: Expression, partitionKeyIds: AttributeSet): Option[Expression] = { +// ignore the pure non-partition key expression in conjunction of the expression tree +val additionalPartPredicate = predicate transformUp { + case a @ And(left, right) if a.deterministic && +left.references.intersect(partitionKeyIds).isEmpty => right + case a @ And(left, right) if a.deterministic && +right.references.intersect(partitionKeyIds).isEmpty => left --- End diff -- Actually, the output of `!(partition = 1 && a > 3)` should be `!(partition=1)`, what should be dropped here is the `a>3`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
GitHub user chenghao-intel opened a pull request: https://github.com/apache/spark/pull/13585 [SPARK-15859][SQL] Optimize the partition pruning within the disjunction ## What changes were proposed in this pull request? In disjunction, the partition pruning expression can simply ignore the non-partitioned expression if it appears in the junction. For instance: ```scala (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2) (part1 == 1 and a > 3) or (a < 100) => None (a > 100 && b < 100) or (part1 = 10) => None (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2) ``` This PR will only works for the HiveTableScan, will submit another PR to optimize the data source API back-end scan. ## How was this patch tested? The unit test is also included in this PR. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chenghao-intel/spark partition Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13585.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13585 commit 08519f2e7a3222cb791e6ce1b8af0c132ff16b29 Author: Cheng Hao <hao.ch...@intel.com> Date: 2016-06-08T08:48:52Z optimize the partition pruning --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13542: [SPARK-15730][SQL][WIP] Respect the --hiveconf in the sp...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/13542 Currently, the SparkSQL cli will ignore the configuration passed from commandline via `--hiveconf`, this will break lots of existing application, it's not by design, isn't it? @yhuai @rxin We are still verifying if this PR by running the TPCx-BB, will remove the WIP once it passed the test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13542: [SPARK-15730][SQL][WIP] Respect the --hiveconf in the sp...
Github user chenghao-intel commented on the issue: https://github.com/apache/spark/pull/13542 cc @yhuai --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13542: [SPARK-15730][SQL][WIP] Respect the --hiveconf in...
GitHub user chenghao-intel opened a pull request: https://github.com/apache/spark/pull/13542 [SPARK-15730][SQL][WIP] Respect the --hiveconf in the spark-sql command line ## What changes were proposed in this pull request? We should respect the --hiveconf in the spark-sql command line, otherwise, the existing applications based on the spark 1.6 and earlier will broke, as the configurations specified via --hiveconf are missing ## How was this patch tested? I've added the unit test, but still need to be verified with real application. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chenghao-intel/spark hiveconf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13542.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13542 commit bbf0ac4da1278722f82abd38b5cc7b0f94f17a60 Author: Cheng Hao <hao.ch...@intel.com> Date: 2016-06-07T14:55:27Z respect the --hiveconf in the SparkSQLCliDriver commandline --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-15480][UI][Streaming]show missed InputI...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/13259#issuecomment-221166631 cc @zsxwing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14631][SQL][WIP]drop database cascade s...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/12391#issuecomment-209937183 LGTM except some minor comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14631][SQL][WIP]drop database cascade s...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/12391#discussion_r59711889 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala --- @@ -46,4 +48,23 @@ class HiveExternalCatalogSuite extends CatalogTestCases { protected override def resetState(): Unit = client.reset() + import utils._ + + test("drop database cascade with function defined") { +import org.apache.spark.sql.catalyst.expressions.Lower + +val catalog = newEmptyCatalog() +val dbName = "dbCascade" +val path = newUriForDatabase() +catalog.createDatabase(CatalogDatabase(dbName, "", path, Map.empty), ignoreIfExists = false) +// create a permanent function in catalog +catalog.createFunction(dbName, CatalogFunction( + FunctionIdentifier("func1", Some(dbName)), Lower.getClass.getName, Nil)) --- End diff -- Nit: `Lower.getClass` => `classOf[Lower]` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14631][SQL][WIP]drop database cascade s...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/12391#discussion_r59711729 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala --- @@ -17,20 +17,22 @@ package org.apache.spark.sql.hive +import java.io.File + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.util.VersionInfo import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader} -import org.apache.spark.util.Utils /** * Test suite for the [[HiveExternalCatalog]]. */ class HiveExternalCatalogSuite extends CatalogTestCases { - private val client: HiveClient = { + private lazy val client: HiveClient = { --- End diff -- Does `lazy` have to be? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14631][SQL][WIP]drop database cascade s...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/12391#discussion_r59711674 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala --- @@ -17,20 +17,22 @@ package org.apache.spark.sql.hive +import java.io.File --- End diff -- Unused import? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12610][SQL] Add Anti join operators
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/10563#issuecomment-206693926 Close this PR due to it's merged in #12214 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12610][SQL] Add Anti join operators
Github user chenghao-intel closed the pull request at: https://github.com/apache/spark/pull/10563 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12196][Core] Store/retrieve blocks in d...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/10225#issuecomment-205712538 @JoshRosen I am not sure if this still be part of your refactorings, or can we bring up this PR? This PR is quite critical performance improvement when mixed PCI-E SSD / HDD, particularly for the large mount of data shuffling scenario. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14021][SQL] custom context support for ...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/11843#issuecomment-202395548 cc @yhuai , this is critical for our own customized `HiveContext`, can you please merge this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14021][SQL] custom context support for ...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/11843#issuecomment-200900425 cc @rxin @liancheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14021][SQL][WIP] custom context support...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11843#discussion_r57120180 --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala --- @@ -34,6 +34,20 @@ private[hive] object SparkSQLEnv extends Logging { var hiveContext: HiveContext = _ var sparkContext: SparkContext = _ + private def readContextClassFromConf(sparkConf: SparkConf): Class[_ <: HiveContext] = { +val className = + sparkConf.get("spark.sql.context.class", "org.apache.spark.sql.hive.HiveContext") --- End diff -- instead of hard code, can we make it as `classOf[HiveContext].getName()`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13889][YARN] Fix integer overflow when ...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/11713#issuecomment-197131091 cc @rxin @JoshRosen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13895][SQL]Change the return type of Da...
GitHub user chenghao-intel opened a pull request: https://github.com/apache/spark/pull/11731 [SPARK-13895][SQL]Change the return type of DataFrameReader.text ## What changes were proposed in this pull request? Change the return type of `DataFrameReader.text` from `DataFrame` to `Dataset[String]` ## How was this patch tested? No additional unit test required. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chenghao-intel/spark dfreader Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/11731.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #11731 commit be163d6b353fde70565ff962bd3dea391f932120 Author: Cheng Hao <hao.ch...@intel.com> Date: 2016-03-15T13:58:04Z change the return type of DataFrameReader.text from DataFrame to Dataset[String] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13889][YARN] Fix integer overflow when ...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/11713#issuecomment-196816685 BTW, @carsonwang can you also describe without this change, what would happen to those applications with dynamic allocation enabled? This will helps people to understand the impact of this bug fixing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13894][SQL] SqlContext.range return typ...
GitHub user chenghao-intel opened a pull request: https://github.com/apache/spark/pull/11730 [SPARK-13894][SQL] SqlContext.range return type from DataFrame to DataSet ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-13894 Change the return type of the `range` API from `DataFrame` to `Dataset`. ## How was this patch tested? No additional unit test required. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chenghao-intel/spark range Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/11730.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #11730 commit b520eb576f6e49b4c5abe53d1ec7b0800d4a79f9 Author: Cheng Hao <hao.ch...@intel.com> Date: 2016-03-15T12:26:02Z SqlContext.range return type from DataFrame to DataSet --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13889][YARN] Fix integer overflow when ...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11713#discussion_r56130697 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -73,7 +73,8 @@ private[spark] class ApplicationMaster( } else { sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0) } -val defaultMaxNumExecutorFailures = math.max(3, 2 * effectiveNumExecutors) +val defaultMaxNumExecutorFailures = math.max(3, --- End diff -- Can you add a comment here? Says `effectiveNumExecutors` is `Int.MaxValue` with dynamic allocation enabled by default? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11517][SQL]Calc partitions in parallel ...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/9483#issuecomment-189470296 LGTM except some minor suggestions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11517][SQL]Calc partitions in parallel ...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/9483#discussion_r54296531 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala --- @@ -89,4 +89,25 @@ class HiveTableScanSuite extends HiveComparisonTest { assert(sql("select CaseSensitiveColName from spark_4959_2").head() === Row("hi")) assert(sql("select casesensitivecolname from spark_4959_2").head() === Row("hi")) } + --- End diff -- remove the extra empty line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11517][SQL]Calc partitions in parallel ...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/9483#discussion_r54296448 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.util.concurrent.Callable + +import scala.reflect.ClassTag + +import org.apache.spark.{Partition, SparkContext} +import org.apache.spark.rdd.{RDD, UnionPartition, UnionRDD} +import org.apache.spark.util.ThreadUtils + +object ParallelUnionRDD { --- End diff -- `private[hive]` or move it into the upper level package? The same for the class `ParallelUnionRDD`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11517][SQL]Calc partitions in parallel ...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/9483#discussion_r54296499 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.util.concurrent.Callable + +import scala.reflect.ClassTag + +import org.apache.spark.{Partition, SparkContext} +import org.apache.spark.rdd.{RDD, UnionPartition, UnionRDD} +import org.apache.spark.util.ThreadUtils + +object ParallelUnionRDD { + lazy val executorService = ThreadUtils.newDaemonFixedThreadPool(16, "ParallelUnionRDD") +} + +class ParallelUnionRDD[T: ClassTag]( + sc: SparkContext, + rdds: Seq[RDD[T]]) extends UnionRDD[T](sc, rdds){ + + override def getPartitions: Array[Partition] = { +// Calc partitions field for each RDD in parallel. +val rddPartitions = rdds.map {rdd => + (rdd, ParallelUnionRDD.executorService.submit(new Callable[Array[Partition]] { +override def call(): Array[Partition] = rdd.partitions + })) +}.map {case(r, f) => (r, f.get())} --- End diff -- space before `}` and after `{` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r54170956 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/AccumulableCheckpoint.scala --- @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.apache.spark.{Accumulable, AccumulableParam, SparkException} + +/** + * Store information of Accumulable. We can't checkpoint Accumulable dircectly because the + * "readObject" method of Accumulable to add extra logic. + */ +class AccumulableCheckpoint[R, T] private ( + val name: String, + val value: R, + val param: AccumulableParam[R, T]) extends Serializable{ --- End diff -- Space before `{` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r54170867 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -269,6 +270,33 @@ class StreamingContext private[streaming] ( RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } + private[streaming] val recoverableAccuNameToAcc: mutable.Map[String, Accumulable[_, _]] = +mutable.Map.empty + + /** +* Different from accumulator in SparkContext, it will first try to recover from Checkpoint +* if it exist. +* +* @param initialValue initial value of accumulator. It will be ignored when recovering from --- End diff -- We don't do alignment for scala doc in Spark. ```scala @param initialValue Initial value of accumulator. It will be ignored when recovering from.. @param name The name is required as identity to find corresponding accumulator. ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r54170612 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -269,6 +270,33 @@ class StreamingContext private[streaming] ( RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } + private[streaming] val recoverableAccuNameToAcc: mutable.Map[String, Accumulable[_, _]] = +mutable.Map.empty + + /** +* Different from accumulator in SparkContext, it will first try to recover from Checkpoint +* if it exist. +* +* @param initialValue initial value of accumulator. It will be ignored when recovering from +* checkpoint +* @param name name is required as identity to find corresponding accumulator. +*/ + def getOrCreateRecoverableAccumulator[T](initialValue: T, name: String) +(implicit param: AccumulatorParam[T]): Accumulator[T] = { + +def registerNewAccumulator(_initialV: T) : Accumulator[T] = { + val acc = sc.accumulator(_initialV, name) + recoverableAccuNameToAcc(name) = acc + acc +} + +val newInitialValue: T = if (isCheckpointPresent) { + _cp.trackedAccs.find(_.name == name).map(_.value).getOrElse(initialValue).asInstanceOf[T] +} else initialValue --- End diff -- Nit: ```scala if (...) { ... } else { ... } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13222][Streaming][WIP]make sure latest ...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11101#discussion_r54167619 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala --- @@ -123,6 +126,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { timedOut } + // generate one more batch to make sure RDD in lastJob is checkpointed. As an performance + // optimization, if the latest info has been checkpointed in last batch, there is no need + // to run another round. "isCheckpointMissedLastTime" method here is in charge of collect + // such information from every DStream recursively. + if (!jobScheduler.receiverTracker.hasUnallocatedBlocks && +ssc.graph.isCheckpointMissedLastTime) { +Thread.sleep(ssc.graph.batchDuration.milliseconds) + } --- End diff -- I just wonder if we can add a double check if the last mini batch finish, otherwise, at least we can add a warning log. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53925855 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId --- End diff -- the exception is about the `ssc` (StreamingContext), probably the property of `recoverableAccuNameToId`; in the meantime, the calculation of `val trackedAccs = ` should happens in master node, not in every executor, I doubt this exception was caused by something else? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53419256 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -269,6 +270,39 @@ class StreamingContext private[streaming] ( RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } + private[streaming] val recoverableAccuNameToId: mutable.Map[String, Long] = { --- End diff -- Yes it will add the strong references, that's why I said we need to reset the `StreamingContext.recoverableAccuNameToId` in `stop / shutdown` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53419167 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId --- End diff -- Hmmm, I don't think so, `ssc.` is used everywhere in this class. can you try it? probably the exception you met was lead by something else. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53419010 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId + + // initialize from ssc.context after SPARK-13051 + val trackedAccs: Array[AccumulableCheckpoint[_, _]] = Accumulators.originals.filter(ele => --- End diff -- OK, I see, let's see if we can do something inside of `Accumulable.readObject`, currently it's more like a hacky way to work around. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53417873 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -269,6 +270,39 @@ class StreamingContext private[streaming] ( RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } + private[streaming] val recoverableAccuNameToId: mutable.Map[String, Long] = { --- End diff -- I don't think the memory space is the problem, as in Java collection always stores object references, not the real duplicated objects. The reason I suggested to do that is to simplify the code at https://github.com/apache/spark/pull/11249/files#diff-f0064bc8820551c338276e29d922e459R47 , which mean we don't need to get the Accumulator again by accumulator ids, particularly the `Accumulators.originals` stores the `WeakReference` object, it can return null. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53417573 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId + + // initialize from ssc.context after SPARK-13051 + val trackedAccs: Array[AccumulableCheckpoint[_, _]] = Accumulators.originals.filter(ele => --- End diff -- I just check the code of `Accumulable.readObject()`, seems never used. were you meeting problem with that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53417343 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll + @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId --- End diff -- I am not talking about the `@transient`, I mean we can move this inside of https://github.com/apache/spark/pull/11249/files#diff-f0064bc8820551c338276e29d922e459R48 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13222][Streaming][WIP]make sure latest ...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11101#discussion_r53417276 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala --- @@ -123,6 +126,12 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { timedOut } + // generate one more bacth to make sure RDD in lastJob is checkpointed. + if (!jobScheduler.receiverTracker.hasUnallocatedBlocks && +ssc.graph.isCheckpointMissedLastTime) { +Thread.sleep(ssc.graph.batchDuration.milliseconds) --- End diff -- ok, I was wondering it will be more graceful if this thread can be woke up as soon as the batch job finished, other than sleep with a fixed length of time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/11249#discussion_r53400290 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala --- @@ -269,6 +270,39 @@ class StreamingContext private[streaming] ( RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body) } + private[streaming] val recoverableAccuNameToId: mutable.Map[String, Long] = { --- End diff -- `mutable.Map[String, Accumulator[_,_]]`? and reset the `Map` while shutdown? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org