[GitHub] spark issue #19318: [SPARK-22096][ML] use aggregateByKeyLocally in feature f...

2017-09-28 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/19318
  
@VinceShieh can you please mark this PR's title as "[WIP]"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13585: [SPARK-15859][SQL] Optimize the partition pruning within...

2017-06-14 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/13585
  
Oh, yes, I am closing it, will reopen it when we have another idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...

2017-06-14 Thread chenghao-intel
Github user chenghao-intel closed the pull request at:

https://github.com/apache/spark/pull/13585


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17936: [SPARK-20638][Core]Optimize the CartesianRDD to reduce r...

2017-06-04 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/17936
  
I can understand any code change in Spark core will be hard to review due 
to the regression concern, I think we can leave the PR for discussion.
1) Actually the `UnsafeCartesianRDD` doesn't aware the block locality and 
will re-fetch the data from remote even the data has been fetched by another 
local node task, that's why we have to change some code in `BlockManager`.
2) For some existing application based on RDD, like the `MLLib` still are 
using the `CartesianRDD`, and we can observe 50x performance boosting in ALS 
prediction. Previously even we couldn't finish the ALS predication without this 
optimization until we well tuning lots of things.
3) Repeatable data block iterations probably very useful for new API 
implementations like Cartesian Product for Machine Learning due to performance 
concern, unfortunately the `BlockManager` doesn't provide this feature, and we 
may add some other operations based on this improvement in the future, that's 
why we think it's important.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17359: [SPARK-20028][SQL] Add aggreagate expression nGrams

2017-03-24 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/17359
  
@rxin nGram is the built-in UDAF in Hive, and some users complaints they 
faced performance issue when running the queries with nGram.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16245: [SPARK-18824][SQL] Add optimizer rule to reorder ...

2017-01-21 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/16245#discussion_r97211716
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -514,6 +514,34 @@ case class OptimizeCodegen(conf: CatalystConf) extends 
Rule[LogicalPlan] {
 
 
 /**
+ * Reorders the predicates in `Filter` so more expensive expressions like 
UDF can evaluate later.
+ */
+object ReorderPredicatesInFilter extends Rule[LogicalPlan] with 
PredicateHelper {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case f @ Filter(pred, child) =>
+  // Extracts deterministic suffix expressions from Filter predicate.
+  val expressions = splitConjunctivePredicates(pred)
+  // The beginning index of the deterministic suffix expressions.
+  var splitIndex = -1
+  (expressions.length - 1 to 0 by -1).foreach { idx =>
+if (splitIndex == -1 && !expressions(idx).deterministic) {
+  splitIndex = idx + 1
+}
+  }
+  if (splitIndex == expressions.length) {
+// All expressions are non-deterministic, no reordering.
+f
+  } else {
+val (nonDeterminstics, deterministicExprs) = 
expressions.splitAt(splitIndex)
--- End diff --

Hmm, actually that's what I mean, probably some confusing with 
`non-deterministic` with `non-foldable`? I think we can skip them both in a 
short cut evaluation. as those expressions are not `stateful`(unfortunately, 
Spark SQL expression doesn't have the concept of `stateful`), so skip the 
evaluation of them are harmless, and this is exactly the short cut logic of 
expression `AND`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16245: [SPARK-18824][SQL] Add optimizer rule to reorder Filter ...

2017-01-21 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/16245
  
I think that's true in most of time for`Scala UDF needs extra conversion 
between internal format and external format on input and out`, not all of the 
time, for example, some built-in string based operations and its combinations 
are also quite heavy in evaluation, and most likely, this probably causes 
concern for an experienced SQL developers, to write an optimal(business related 
short-cutting logic) SQL expressions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16245: [SPARK-18824][SQL] Add optimizer rule to reorder ...

2017-01-21 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/16245#discussion_r97211489
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -514,6 +514,34 @@ case class OptimizeCodegen(conf: CatalystConf) extends 
Rule[LogicalPlan] {
 
 
 /**
+ * Reorders the predicates in `Filter` so more expensive expressions like 
UDF can evaluate later.
+ */
+object ReorderPredicatesInFilter extends Rule[LogicalPlan] with 
PredicateHelper {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case f @ Filter(pred, child) =>
+  // Extracts deterministic suffix expressions from Filter predicate.
+  val expressions = splitConjunctivePredicates(pred)
+  // The beginning index of the deterministic suffix expressions.
+  var splitIndex = -1
+  (expressions.length - 1 to 0 by -1).foreach { idx =>
+if (splitIndex == -1 && !expressions(idx).deterministic) {
+  splitIndex = idx + 1
+}
+  }
+  if (splitIndex == expressions.length) {
+// All expressions are non-deterministic, no reordering.
+f
+  } else {
+val (nonDeterminstics, deterministicExprs) = 
expressions.splitAt(splitIndex)
--- End diff --

I mean `(rand() > 0) && b)` should equals to `b && (rand() >0)`, and even, 
the latter probably has better performance, due to the short cut evaluation of 
`AND`. isn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16245: [SPARK-18824][SQL] Add optimizer rule to reorder Filter ...

2017-01-21 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/16245
  
Actually I doubt this is really an optimization, as the assumption of Scala 
UDF is slower than the non-SCALA UDF probably not always true.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16245: [SPARK-18824][SQL] Add optimizer rule to reorder ...

2017-01-21 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/16245#discussion_r97211330
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -514,6 +514,34 @@ case class OptimizeCodegen(conf: CatalystConf) extends 
Rule[LogicalPlan] {
 
 
 /**
+ * Reorders the predicates in `Filter` so more expensive expressions like 
UDF can evaluate later.
+ */
+object ReorderPredicatesInFilter extends Rule[LogicalPlan] with 
PredicateHelper {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case f @ Filter(pred, child) =>
+  // Extracts deterministic suffix expressions from Filter predicate.
+  val expressions = splitConjunctivePredicates(pred)
+  // The beginning index of the deterministic suffix expressions.
+  var splitIndex = -1
+  (expressions.length - 1 to 0 by -1).foreach { idx =>
+if (splitIndex == -1 && !expressions(idx).deterministic) {
+  splitIndex = idx + 1
+}
+  }
+  if (splitIndex == expressions.length) {
+// All expressions are non-deterministic, no reordering.
+f
+  } else {
+val (nonDeterminstics, deterministicExprs) = 
expressions.splitAt(splitIndex)
--- End diff --

I am a little confused why we need to separate the `non-deterministic`? 
Should be the `stateful` or `foldable`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16476: [SPARK-19084][SQL][WIP] Implement expression field

2017-01-09 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/16476
  
Since the different data type will be simply ignored, I think we'd better 
also add the optimization rule in `Optimizer`.

As well as the python/scala API support, but need to confirm with @rxin, 
why we don't need the API `field`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16476: [SPARK-19084][SQL][WIP] Implement expression fiel...

2017-01-09 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/16476#discussion_r95283107
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
@@ -1528,6 +1528,18 @@ object functions {
   def factorial(e: Column): Column = withExpr { Factorial(e.expr) }
 
   /**
+* Returns the index of str in (str1, str2, ...) list or 0 if not found.
+* It takes at least 2 parameters, and all parameters' types should be 
subtypes of AtomicType.
+*
+* @group normal_funcs
+* @since 2.2.0
+*/
+  @scala.annotation.varargs
+  def field(expr1: Column, expr2: Column, exprs: Column*): Column = 
withExpr {
--- End diff --

@rxin can you explain a little bit why we remove this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16476: [SPARK-19084][SQL][WIP] Implement expression fiel...

2017-01-09 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/16476#discussion_r95282681
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -340,3 +344,102 @@ object CaseKeyWhen {
 CaseWhen(cases, elseValue)
   }
 }
+
+/**
+ * A function that returns the index of expr in (expr1, expr2, ...) list 
or 0 if not found.
+ * It takes at least 2 parameters, and all parameters should be subtype of 
AtomicType or NullType.
+ * It's also acceptable to give parameters of different types.
+ * If the search string is NULL, the return value is 0 because NULL fails 
equality comparison with any value.
+ * When the paramters have different types, comparing will be done based 
on type firstly,
+ * for example, ''999'' won't be considered equal with 999, no implicit 
cast will be done here.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, expr1, expr2, ...) - Returns the index of expr in 
the expr1, expr2, ... or 0 if not found.",
+  extended = """
+Examples:
+  > SELECT _FUNC_(10, 9, 3, 10, 4);
+   3
+  > SELECT _FUNC_('a', 'b', 'c', 'd', 'a');
+   4
+  > SELECT _FUNC_('999', 'a', 999, 9.99, '999');
+   4
+  """)
+case class Field(children: Seq[Expression]) extends Expression {
+
+  /** Even if expr is not found in (expr1, expr2, ...) list, the value 
will be 0, not null */
+  override def nullable: Boolean = false
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  private lazy val ordering = 
TypeUtils.getInterpretedOrdering(children(0).dataType)
+
+  private val dataTypeMatchIndex: Seq[Int] = children.tail.zip(Stream from 
1).filter(
+_._1.dataType == children.head.dataType).map(_._2)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+if (children.length <= 1) {
+  TypeCheckResult.TypeCheckFailure(s"FIELD requires at least 2 
arguments")
+} else if (!children.forall(
+e => e.dataType.isInstanceOf[AtomicType] || 
e.dataType.isInstanceOf[NullType])) {
+  TypeCheckResult.TypeCheckFailure(s"FIELD requires all arguments to 
be of AtomicType")
+} else
+  TypeCheckResult.TypeCheckSuccess
+  }
+
+  override def dataType: DataType = IntegerType
+  override def eval(input: InternalRow): Any = {
+val target = children.head.eval(input)
+val targetDataType = children.head.dataType
+@tailrec def findEqual(index: Int): Int = {
+  if (index == dataTypeMatchIndex.size) {
+0
+  } else {
+val value = children(dataTypeMatchIndex(index)).eval(input)
+if (value != null && ordering.equiv(target, value))
--- End diff --

use braces. see https://github.com/databricks/scala-style-guide#curly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16476: [SPARK-19084][SQL][WIP] Implement expression fiel...

2017-01-09 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/16476#discussion_r95282465
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -340,3 +344,102 @@ object CaseKeyWhen {
 CaseWhen(cases, elseValue)
   }
 }
+
+/**
+ * A function that returns the index of expr in (expr1, expr2, ...) list 
or 0 if not found.
+ * It takes at least 2 parameters, and all parameters should be subtype of 
AtomicType or NullType.
+ * It's also acceptable to give parameters of different types.
+ * If the search string is NULL, the return value is 0 because NULL fails 
equality comparison with any value.
+ * When the paramters have different types, comparing will be done based 
on type firstly,
+ * for example, ''999'' won't be considered equal with 999, no implicit 
cast will be done here.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, expr1, expr2, ...) - Returns the index of expr in 
the expr1, expr2, ... or 0 if not found.",
+  extended = """
+Examples:
+  > SELECT _FUNC_(10, 9, 3, 10, 4);
+   3
+  > SELECT _FUNC_('a', 'b', 'c', 'd', 'a');
+   4
+  > SELECT _FUNC_('999', 'a', 999, 9.99, '999');
+   4
+  """)
+case class Field(children: Seq[Expression]) extends Expression {
+
+  /** Even if expr is not found in (expr1, expr2, ...) list, the value 
will be 0, not null */
+  override def nullable: Boolean = false
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  private lazy val ordering = 
TypeUtils.getInterpretedOrdering(children(0).dataType)
+
+  private val dataTypeMatchIndex: Seq[Int] = children.tail.zip(Stream from 
1).filter(
+_._1.dataType == children.head.dataType).map(_._2)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+if (children.length <= 1) {
+  TypeCheckResult.TypeCheckFailure(s"FIELD requires at least 2 
arguments")
+} else if (!children.forall(
+e => e.dataType.isInstanceOf[AtomicType] || 
e.dataType.isInstanceOf[NullType])) {
+  TypeCheckResult.TypeCheckFailure(s"FIELD requires all arguments to 
be of AtomicType")
+} else
+  TypeCheckResult.TypeCheckSuccess
+  }
+
+  override def dataType: DataType = IntegerType
+  override def eval(input: InternalRow): Any = {
+val target = children.head.eval(input)
+val targetDataType = children.head.dataType
+@tailrec def findEqual(index: Int): Int = {
+  if (index == dataTypeMatchIndex.size) {
--- End diff --

if `dataTypeMatchIndex` is `Array[Int]`, then we'd better use 
`dataTypeMatchIndex.length` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16476: [SPARK-19084][SQL][WIP] Implement expression fiel...

2017-01-09 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/16476#discussion_r95282270
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -340,3 +344,102 @@ object CaseKeyWhen {
 CaseWhen(cases, elseValue)
   }
 }
+
+/**
+ * A function that returns the index of expr in (expr1, expr2, ...) list 
or 0 if not found.
+ * It takes at least 2 parameters, and all parameters should be subtype of 
AtomicType or NullType.
+ * It's also acceptable to give parameters of different types.
+ * If the search string is NULL, the return value is 0 because NULL fails 
equality comparison with any value.
+ * When the paramters have different types, comparing will be done based 
on type firstly,
+ * for example, ''999'' won't be considered equal with 999, no implicit 
cast will be done here.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, expr1, expr2, ...) - Returns the index of expr in 
the expr1, expr2, ... or 0 if not found.",
+  extended = """
+Examples:
+  > SELECT _FUNC_(10, 9, 3, 10, 4);
+   3
+  > SELECT _FUNC_('a', 'b', 'c', 'd', 'a');
+   4
+  > SELECT _FUNC_('999', 'a', 999, 9.99, '999');
+   4
+  """)
+case class Field(children: Seq[Expression]) extends Expression {
+
+  /** Even if expr is not found in (expr1, expr2, ...) list, the value 
will be 0, not null */
+  override def nullable: Boolean = false
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  private lazy val ordering = 
TypeUtils.getInterpretedOrdering(children(0).dataType)
+
+  private val dataTypeMatchIndex: Seq[Int] = children.tail.zip(Stream from 
1).filter(
+_._1.dataType == children.head.dataType).map(_._2)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+if (children.length <= 1) {
+  TypeCheckResult.TypeCheckFailure(s"FIELD requires at least 2 
arguments")
+} else if (!children.forall(
+e => e.dataType.isInstanceOf[AtomicType] || 
e.dataType.isInstanceOf[NullType])) {
+  TypeCheckResult.TypeCheckFailure(s"FIELD requires all arguments to 
be of AtomicType")
+} else
+  TypeCheckResult.TypeCheckSuccess
+  }
+
+  override def dataType: DataType = IntegerType
+  override def eval(input: InternalRow): Any = {
+val target = children.head.eval(input)
+val targetDataType = children.head.dataType
--- End diff --

Unused code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16476: [SPARK-19084][SQL][WIP] Implement expression fiel...

2017-01-09 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/16476#discussion_r95281248
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -340,3 +344,102 @@ object CaseKeyWhen {
 CaseWhen(cases, elseValue)
   }
 }
+
+/**
+ * A function that returns the index of expr in (expr1, expr2, ...) list 
or 0 if not found.
+ * It takes at least 2 parameters, and all parameters should be subtype of 
AtomicType or NullType.
+ * It's also acceptable to give parameters of different types.
+ * If the search string is NULL, the return value is 0 because NULL fails 
equality comparison with any value.
+ * When the paramters have different types, comparing will be done based 
on type firstly,
+ * for example, ''999'' won't be considered equal with 999, no implicit 
cast will be done here.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, expr1, expr2, ...) - Returns the index of expr in 
the expr1, expr2, ... or 0 if not found.",
+  extended = """
+Examples:
+  > SELECT _FUNC_(10, 9, 3, 10, 4);
+   3
+  > SELECT _FUNC_('a', 'b', 'c', 'd', 'a');
+   4
+  > SELECT _FUNC_('999', 'a', 999, 9.99, '999');
+   4
+  """)
+case class Field(children: Seq[Expression]) extends Expression {
+
+  /** Even if expr is not found in (expr1, expr2, ...) list, the value 
will be 0, not null */
+  override def nullable: Boolean = false
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  private lazy val ordering = 
TypeUtils.getInterpretedOrdering(children(0).dataType)
+
+  private val dataTypeMatchIndex: Seq[Int] = children.tail.zip(Stream from 
1).filter(
--- End diff --

`zip(Stream from 1)`, do we really need it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16476: [SPARK-19084][SQL][WIP] Implement expression fiel...

2017-01-09 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/16476#discussion_r95281159
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -340,3 +344,102 @@ object CaseKeyWhen {
 CaseWhen(cases, elseValue)
   }
 }
+
+/**
+ * A function that returns the index of expr in (expr1, expr2, ...) list 
or 0 if not found.
+ * It takes at least 2 parameters, and all parameters should be subtype of 
AtomicType or NullType.
+ * It's also acceptable to give parameters of different types.
+ * If the search string is NULL, the return value is 0 because NULL fails 
equality comparison with any value.
+ * When the paramters have different types, comparing will be done based 
on type firstly,
+ * for example, ''999'' won't be considered equal with 999, no implicit 
cast will be done here.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, expr1, expr2, ...) - Returns the index of expr in 
the expr1, expr2, ... or 0 if not found.",
+  extended = """
+Examples:
+  > SELECT _FUNC_(10, 9, 3, 10, 4);
+   3
+  > SELECT _FUNC_('a', 'b', 'c', 'd', 'a');
+   4
+  > SELECT _FUNC_('999', 'a', 999, 9.99, '999');
+   4
+  """)
+case class Field(children: Seq[Expression]) extends Expression {
+
+  /** Even if expr is not found in (expr1, expr2, ...) list, the value 
will be 0, not null */
+  override def nullable: Boolean = false
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  private lazy val ordering = 
TypeUtils.getInterpretedOrdering(children(0).dataType)
+
+  private val dataTypeMatchIndex: Seq[Int] = children.tail.zip(Stream from 
1).filter(
+_._1.dataType == children.head.dataType).map(_._2)
--- End diff --

`_._1.dataType.sameType(children.head.dataType)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16476: [SPARK-19084][SQL][WIP] Implement expression fiel...

2017-01-09 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/16476#discussion_r95281046
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -340,3 +344,102 @@ object CaseKeyWhen {
 CaseWhen(cases, elseValue)
   }
 }
+
+/**
+ * A function that returns the index of expr in (expr1, expr2, ...) list 
or 0 if not found.
+ * It takes at least 2 parameters, and all parameters should be subtype of 
AtomicType or NullType.
+ * It's also acceptable to give parameters of different types.
+ * If the search string is NULL, the return value is 0 because NULL fails 
equality comparison with any value.
+ * When the paramters have different types, comparing will be done based 
on type firstly,
+ * for example, ''999'' won't be considered equal with 999, no implicit 
cast will be done here.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, expr1, expr2, ...) - Returns the index of expr in 
the expr1, expr2, ... or 0 if not found.",
+  extended = """
+Examples:
+  > SELECT _FUNC_(10, 9, 3, 10, 4);
+   3
+  > SELECT _FUNC_('a', 'b', 'c', 'd', 'a');
+   4
+  > SELECT _FUNC_('999', 'a', 999, 9.99, '999');
+   4
+  """)
+case class Field(children: Seq[Expression]) extends Expression {
+
+  /** Even if expr is not found in (expr1, expr2, ...) list, the value 
will be 0, not null */
+  override def nullable: Boolean = false
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  private lazy val ordering = 
TypeUtils.getInterpretedOrdering(children(0).dataType)
+
+  private val dataTypeMatchIndex: Seq[Int] = children.tail.zip(Stream from 
1).filter(
--- End diff --

Array[Int] instead? Seq[Int] probably a LinkedList in its concrete 
implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16476: [SPARK-19084][SQL] Implement expression field

2017-01-09 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/16476
  
@gczsjdy can you please add [WIP] in the title, until you feel the code is 
ready for review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16476: [SPARK-19084][SQL] Implement expression field

2017-01-08 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/16476#discussion_r95080769
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -340,3 +341,91 @@ object CaseKeyWhen {
 CaseWhen(cases, elseValue)
   }
 }
+
+/**
+ * A function that returns the index of str in (str1, str2, ...) list or 0 
if not found.
+ * It takes at least 2 parameters, and all parameters' types should be 
subtypes of AtomicType.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(str, str1, str2, ...) - Returns the index of str in the 
str1,str2,... or 0 if not found.",
+  extended = """
+Examples:
+  > SELECT _FUNC_(10, 9, 3, 10, 4);
+   3
+  """)
+case class Field(children: Seq[Expression]) extends Expression {
+
+  override def nullable: Boolean = false
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  private lazy val ordering = 
TypeUtils.getInterpretedOrdering(children(0).dataType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+if (children.length <= 1) {
+  TypeCheckResult.TypeCheckFailure(s"FIELD requires at least 2 
arguments")
+} else if (!children.forall(_.dataType.isInstanceOf[AtomicType])) {
+  TypeCheckResult.TypeCheckFailure(s"FIELD requires all arguments to 
be of AtomicType")
+} else
+  TypeCheckResult.TypeCheckSuccess
+  }
+
+  override def dataType: DataType = IntegerType
+
+  override def eval(input: InternalRow): Any = {
+val target = children.head.eval(input)
+val targetDataType = children.head.dataType
+def findEqual(target: Any, params: Seq[Expression], index: Int): Int = 
{
+  params.toList match {
--- End diff --

`toList` probably causes performance overhead, I don't think we have to 
sacrifice the performance for using the pattern match. In the meantime, I still 
believe we don't have to check the data type during the runtime. It's supposed 
to be done during the `compile` time or only done once for the first time in 
`eval`.

The `Field` evaluation is quite confusing, as @gatorsmile suggested, we 
need to describe how to evaluate the value when sub expressions' data type are 
different.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16476: [SPARK-19084][SQL] Implement expression field

2017-01-08 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/16476#discussion_r95080582
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -340,3 +341,91 @@ object CaseKeyWhen {
 CaseWhen(cases, elseValue)
   }
 }
+
+/**
+ * A function that returns the index of str in (str1, str2, ...) list or 0 
if not found.
+ * It takes at least 2 parameters, and all parameters' types should be 
subtypes of AtomicType.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(str, str1, str2, ...) - Returns the index of str in the 
str1,str2,... or 0 if not found.",
+  extended = """
+Examples:
+  > SELECT _FUNC_(10, 9, 3, 10, 4);
+   3
+  """)
+case class Field(children: Seq[Expression]) extends Expression {
+
+  override def nullable: Boolean = false
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  private lazy val ordering = 
TypeUtils.getInterpretedOrdering(children(0).dataType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+if (children.length <= 1) {
+  TypeCheckResult.TypeCheckFailure(s"FIELD requires at least 2 
arguments")
+} else if (!children.forall(_.dataType.isInstanceOf[AtomicType])) {
+  TypeCheckResult.TypeCheckFailure(s"FIELD requires all arguments to 
be of AtomicType")
+} else
+  TypeCheckResult.TypeCheckSuccess
+  }
+
+  override def dataType: DataType = IntegerType
+
+  override def eval(input: InternalRow): Any = {
+val target = children.head.eval(input)
+val targetDataType = children.head.dataType
+def findEqual(target: Any, params: Seq[Expression], index: Int): Int = 
{
+  params.toList match {
--- End diff --

you can add the annotation `@tailrec` for explicitly declare that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15579: Added support for extra command in front of spark.

2016-10-24 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/15579
  
Oh, thank you @jerryshao , just noticed you gave inputs also. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15579: Added support for extra command in front of spark.

2016-10-24 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/15579
  
@srowen 
Besides `numactl`, some profiling tools like the `valgrind`, `strace`, 
`vtune`, and also the system call hackings we probably needed before the 
executor process launched.

I'll agree this probably not that secure to provide the prefixed command 
via configuration, but an earlier code review if community will accept that;

@rxin And a follow up commits with unit tests, standardalone and mesos mode 
support will be added 
soon.

@sheepduke please update the PR description, title first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15361: [SPARK-17765][SQL] Support for writing out user-defined ...

2016-10-16 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/15361
  
yes, please go ahead. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks from di...

2016-09-06 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/10225#discussion_r77748327
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -136,7 +136,9 @@ private[spark] class IndexShuffleBlockResolver(
   shuffleId: Int,
   mapId: Int,
   lengths: Array[Long],
-  dataTmp: File): Unit = {
--- End diff --

Do we have to change the code in this function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14366: [SPARK-16732][SQL] Remove unused codes in subexpressionE...

2016-08-31 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/14366
  
Ping @rxin , seems the upstream is not updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #12646: [SPARK-14878][SQL] Trim characters string function suppo...

2016-08-31 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/12646
  
I like this PR since it's part of SQL standard, but there are also another 
Jira, https://issues.apache.org/jira/browse/SPARK-17299 , maybe we can do that 
in a follow up PR to fix. Can you @kevinyu98 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...

2016-08-31 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/12646#discussion_r76966164
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2677,4 +2678,107 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
 data.selectExpr("`part.col1`", "`col.1`"))
 }
   }
+
+  test("TRIM function-BOTH") {
+val ae1 = intercept[AnalysisException]{
+  sql("SELECT TRIM(BOTH 'aa' FROM 'aabcaa')").collect()
--- End diff --

Sorry, ignore this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...

2016-08-31 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/12646#discussion_r76966028
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2677,4 +2678,107 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
 data.selectExpr("`part.col1`", "`col.1`"))
 }
   }
+
+  test("TRIM function-BOTH") {
+val ae1 = intercept[AnalysisException]{
+  sql("SELECT TRIM(BOTH 'aa' FROM 'aabcaa')").collect()
--- End diff --

Nit: Do we really need this test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...

2016-08-31 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/12646#discussion_r76965552
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -476,6 +476,61 @@ public UTF8String trim() {
 }
   }
 
+  /**
+   * Removes all specified trim character either from the beginning or the 
ending of a string
+   * @param trimChar the trim character
+   */
+  public UTF8String trim (UTF8String trimChar) {
--- End diff --

no space after `trim`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...

2016-08-31 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/12646#discussion_r76965110
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala 
---
@@ -1789,6 +1803,133 @@ class SQLQuerySuite extends QueryTest with 
SQLTestUtils with TestHiveSingleton {
 }
   }
 
+  test("TRIM function-BOTH") {
+withTable("trimBoth", "trimStrut") {
+  sql("create table trimBoth (c1 string, c2 char(1), c3 string, c4 
string, " +
+"c5 string, c6 string)")
+  // scalastyle:off
+  sql("insert into trimBoth select 'cc', 'c', ' cccbacc', 
'cccbacc数', '数据砖头', '数'")
+  // scalastyle:on
+  sql("create table trimStrut (c1 struct, c2 
string)")
+  sql("insert into trimStrut values ((100, 'abc'), 'ABC')")
+
+  intercept[AnalysisException] {
+sql("SELECT TRIM('c', C1, 'd') from trimBoth")
+  }
+  intercept[AnalysisException] {
+   sql("SELECT TRIM('cc', C1) from trimBoth").collect
+  }
+  intercept[AnalysisException] {
+   sql("SELECT TRIM(C2, C1) from trimBoth").collect
+  }
+  intercept[AnalysisException] {
+   sql("SELECT TRIM(BOTH C2 FROM C1) from trimBoth").collect
+  }
+  intercept[AnalysisException] {
+sql("select trim(c1,c2) from trimStrut")
+  }
+  intercept[
--- End diff --

typo with `enter` key?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...

2016-08-31 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/12646#discussion_r76963822
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 ---
@@ -431,56 +432,233 @@ case class FindInSet(left: Expression, right: 
Expression) extends BinaryExpressi
 }
 
 /**
- * A function that trim the spaces from both ends for the specified string.
+ * A function that trim the spaces or a character from both ends for the 
specified string.
  */
 @ExpressionDescription(
-  usage = "_FUNC_(str) - Removes the leading and trailing space characters 
from str.",
-  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL'")
-case class StringTrim(child: Expression)
-  extends UnaryExpression with String2StringExpression {
+  usage = "_FUNC_(str) - Removes the leading and trailing space characters 
or char from str.",
+  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL'\n" +
+ "> SELECT _FUNC_('S', 'SSparkSQLS');\n 'parkSQL'\n" +
+ "> SELECT _FUNC_(BOTH 'S' FROM 'SSparkSQLS');\n 'parkSQL'\n" +
+ "> SELECT _FUNC_(LEADING 'S' FROM 'SSparkSQLS');\n 
'parkSQLS'\n" +
+ "> SELECT _FUNC_(TRAILING 'S' FROM 'SSparkSQLS');\n 
'SSparkSQL'")
+case class StringTrim(children: Seq[Expression])
+  extends Expression with ImplicitCastInputTypes {
+
+  require (children.size <= 2 && children.nonEmpty,
+"$prettyName requires at least one argument and no more than two.")
+
+  override def dataType: DataType = StringType
+  override def inputTypes: Seq[AbstractDataType] = 
Seq.fill(children.size)(StringType)
 
-  def convert(v: UTF8String): UTF8String = v.trim()
+  override def nullable: Boolean = children.exists(_.nullable)
+  override def foldable: Boolean = children.forall(_.foldable)
 
   override def prettyName: String = "trim"
 
+  override def eval(input: InternalRow): Any = {
+val inputs = children.map(_.eval(input).asInstanceOf[UTF8String])
+if (inputs(0) != null) {
+  if (children.size == 1) {
+return inputs(0).trim()
+  } else if (inputs(1) != null) {
+if (inputs(0).numChars > 1) {
+  throw new AnalysisException(s"Trim character '${inputs(0)}' can 
not be greater than " +
+s"1 character.")
+} else {
+  return inputs(1).trim(inputs(0))
+}
+  }
+}
+null
+  }
+
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-defineCodeGen(ctx, ev, c => s"($c).trim()")
+if (children.size == 2 &&
+   (! children(0).isInstanceOf[Literal] || children(0).toString.length 
> 1)) {
+  throw new AnalysisException(s"The trimming parameter should be 
Literal " +
+s"and only one character.") }
+
+val evals = children.map(_.genCode(ctx))
+val inputs = evals.map { eval =>
+  s"${eval.isNull} ? null : ${eval.value}"
+}
+val getTrimFunction = if (children.size == 1) {
+  s"""UTF8String ${ev.value} = ${inputs(0)}.trim();"""
+} else {
+  s"""UTF8String ${ev.value} = 
${inputs(1)}.trim(${inputs(0)});""".stripMargin
+}
+ev.copy(evals.map(_.code).mkString("\n") +
+s"""
+boolean ${ev.isNull} = false;
+${getTrimFunction};
+if (${ev.value} == null) {
+  ${ev.isNull} = true;
+}
+""")
+  }
+
+  override def sql: String = {
+if (children.size == 1) {
+  val childrenSQL = children.map(_.sql).mkString(", ")
+  s"$prettyName($childrenSQL)"
+} else {
+  val trimSQL = children(0).map(_.sql).mkString(", ")
+  val tarSQL = children(1).map(_.sql).mkString(", ")
+  s"$prettyName($trimSQL, $tarSQL)"
+}
   }
 }
 
 /**
- * A function that trim the spaces from left end for given string.
+ * A function that trim the spaces or a character from left end for given 
string.
  */
 @ExpressionDescription(
   usage = "_FUNC_(str) - Removes the leading space characters from str.",
-  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL   '")
-case class StringTrimLeft(child: Expression)
-  extends UnaryExpression with String2StringExpression {
+  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL   '\n" +
  

[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...

2016-08-31 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/12646#discussion_r76963598
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 ---
@@ -431,56 +432,233 @@ case class FindInSet(left: Expression, right: 
Expression) extends BinaryExpressi
 }
 
 /**
- * A function that trim the spaces from both ends for the specified string.
+ * A function that trim the spaces or a character from both ends for the 
specified string.
  */
 @ExpressionDescription(
-  usage = "_FUNC_(str) - Removes the leading and trailing space characters 
from str.",
-  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL'")
-case class StringTrim(child: Expression)
-  extends UnaryExpression with String2StringExpression {
+  usage = "_FUNC_(str) - Removes the leading and trailing space characters 
or char from str.",
+  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL'\n" +
+ "> SELECT _FUNC_('S', 'SSparkSQLS');\n 'parkSQL'\n" +
+ "> SELECT _FUNC_(BOTH 'S' FROM 'SSparkSQLS');\n 'parkSQL'\n" +
+ "> SELECT _FUNC_(LEADING 'S' FROM 'SSparkSQLS');\n 
'parkSQLS'\n" +
+ "> SELECT _FUNC_(TRAILING 'S' FROM 'SSparkSQLS');\n 
'SSparkSQL'")
+case class StringTrim(children: Seq[Expression])
+  extends Expression with ImplicitCastInputTypes {
+
+  require (children.size <= 2 && children.nonEmpty,
+"$prettyName requires at least one argument and no more than two.")
+
+  override def dataType: DataType = StringType
+  override def inputTypes: Seq[AbstractDataType] = 
Seq.fill(children.size)(StringType)
 
-  def convert(v: UTF8String): UTF8String = v.trim()
+  override def nullable: Boolean = children.exists(_.nullable)
+  override def foldable: Boolean = children.forall(_.foldable)
 
   override def prettyName: String = "trim"
 
+  override def eval(input: InternalRow): Any = {
+val inputs = children.map(_.eval(input).asInstanceOf[UTF8String])
+if (inputs(0) != null) {
+  if (children.size == 1) {
+return inputs(0).trim()
+  } else if (inputs(1) != null) {
+if (inputs(0).numChars > 1) {
+  throw new AnalysisException(s"Trim character '${inputs(0)}' can 
not be greater than " +
+s"1 character.")
+} else {
+  return inputs(1).trim(inputs(0))
+}
+  }
+}
+null
+  }
+
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-defineCodeGen(ctx, ev, c => s"($c).trim()")
+if (children.size == 2 &&
+   (! children(0).isInstanceOf[Literal] || children(0).toString.length 
> 1)) {
+  throw new AnalysisException(s"The trimming parameter should be 
Literal " +
+s"and only one character.") }
+
+val evals = children.map(_.genCode(ctx))
+val inputs = evals.map { eval =>
+  s"${eval.isNull} ? null : ${eval.value}"
+}
+val getTrimFunction = if (children.size == 1) {
+  s"""UTF8String ${ev.value} = ${inputs(0)}.trim();"""
+} else {
+  s"""UTF8String ${ev.value} = 
${inputs(1)}.trim(${inputs(0)});""".stripMargin
+}
+ev.copy(evals.map(_.code).mkString("\n") +
+s"""
+boolean ${ev.isNull} = false;
+${getTrimFunction};
+if (${ev.value} == null) {
+  ${ev.isNull} = true;
+}
+""")
+  }
+
+  override def sql: String = {
+if (children.size == 1) {
+  val childrenSQL = children.map(_.sql).mkString(", ")
+  s"$prettyName($childrenSQL)"
+} else {
+  val trimSQL = children(0).map(_.sql).mkString(", ")
+  val tarSQL = children(1).map(_.sql).mkString(", ")
+  s"$prettyName($trimSQL, $tarSQL)"
+}
   }
 }
 
 /**
- * A function that trim the spaces from left end for given string.
+ * A function that trim the spaces or a character from left end for given 
string.
  */
 @ExpressionDescription(
   usage = "_FUNC_(str) - Removes the leading space characters from str.",
-  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL   '")
-case class StringTrimLeft(child: Expression)
-  extends UnaryExpression with String2StringExpression {
+  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL   '\n" +
  

[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...

2016-08-31 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/12646#discussion_r76963573
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 ---
@@ -431,56 +432,233 @@ case class FindInSet(left: Expression, right: 
Expression) extends BinaryExpressi
 }
 
 /**
- * A function that trim the spaces from both ends for the specified string.
+ * A function that trim the spaces or a character from both ends for the 
specified string.
  */
 @ExpressionDescription(
-  usage = "_FUNC_(str) - Removes the leading and trailing space characters 
from str.",
-  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL'")
-case class StringTrim(child: Expression)
-  extends UnaryExpression with String2StringExpression {
+  usage = "_FUNC_(str) - Removes the leading and trailing space characters 
or char from str.",
+  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL'\n" +
+ "> SELECT _FUNC_('S', 'SSparkSQLS');\n 'parkSQL'\n" +
+ "> SELECT _FUNC_(BOTH 'S' FROM 'SSparkSQLS');\n 'parkSQL'\n" +
+ "> SELECT _FUNC_(LEADING 'S' FROM 'SSparkSQLS');\n 
'parkSQLS'\n" +
+ "> SELECT _FUNC_(TRAILING 'S' FROM 'SSparkSQLS');\n 
'SSparkSQL'")
+case class StringTrim(children: Seq[Expression])
+  extends Expression with ImplicitCastInputTypes {
+
+  require (children.size <= 2 && children.nonEmpty,
+"$prettyName requires at least one argument and no more than two.")
+
+  override def dataType: DataType = StringType
+  override def inputTypes: Seq[AbstractDataType] = 
Seq.fill(children.size)(StringType)
 
-  def convert(v: UTF8String): UTF8String = v.trim()
+  override def nullable: Boolean = children.exists(_.nullable)
+  override def foldable: Boolean = children.forall(_.foldable)
 
   override def prettyName: String = "trim"
 
+  override def eval(input: InternalRow): Any = {
+val inputs = children.map(_.eval(input).asInstanceOf[UTF8String])
+if (inputs(0) != null) {
+  if (children.size == 1) {
+return inputs(0).trim()
+  } else if (inputs(1) != null) {
+if (inputs(0).numChars > 1) {
+  throw new AnalysisException(s"Trim character '${inputs(0)}' can 
not be greater than " +
+s"1 character.")
+} else {
+  return inputs(1).trim(inputs(0))
+}
+  }
+}
+null
+  }
+
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-defineCodeGen(ctx, ev, c => s"($c).trim()")
+if (children.size == 2 &&
+   (! children(0).isInstanceOf[Literal] || children(0).toString.length 
> 1)) {
+  throw new AnalysisException(s"The trimming parameter should be 
Literal " +
+s"and only one character.") }
+
+val evals = children.map(_.genCode(ctx))
+val inputs = evals.map { eval =>
+  s"${eval.isNull} ? null : ${eval.value}"
+}
+val getTrimFunction = if (children.size == 1) {
+  s"""UTF8String ${ev.value} = ${inputs(0)}.trim();"""
+} else {
+  s"""UTF8String ${ev.value} = 
${inputs(1)}.trim(${inputs(0)});""".stripMargin
+}
+ev.copy(evals.map(_.code).mkString("\n") +
+s"""
+boolean ${ev.isNull} = false;
+${getTrimFunction};
+if (${ev.value} == null) {
+  ${ev.isNull} = true;
+}
+""")
+  }
+
+  override def sql: String = {
+if (children.size == 1) {
+  val childrenSQL = children.map(_.sql).mkString(", ")
+  s"$prettyName($childrenSQL)"
+} else {
+  val trimSQL = children(0).map(_.sql).mkString(", ")
+  val tarSQL = children(1).map(_.sql).mkString(", ")
+  s"$prettyName($trimSQL, $tarSQL)"
+}
   }
 }
 
 /**
- * A function that trim the spaces from left end for given string.
+ * A function that trim the spaces or a character from left end for given 
string.
  */
 @ExpressionDescription(
   usage = "_FUNC_(str) - Removes the leading space characters from str.",
-  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL   '")
-case class StringTrimLeft(child: Expression)
-  extends UnaryExpression with String2StringExpression {
+  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL   '\n" +
  

[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...

2016-08-31 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/12646#discussion_r76963406
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 ---
@@ -431,56 +432,233 @@ case class FindInSet(left: Expression, right: 
Expression) extends BinaryExpressi
 }
 
 /**
- * A function that trim the spaces from both ends for the specified string.
+ * A function that trim the spaces or a character from both ends for the 
specified string.
  */
 @ExpressionDescription(
-  usage = "_FUNC_(str) - Removes the leading and trailing space characters 
from str.",
-  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL'")
-case class StringTrim(child: Expression)
-  extends UnaryExpression with String2StringExpression {
+  usage = "_FUNC_(str) - Removes the leading and trailing space characters 
or char from str.",
+  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL'\n" +
+ "> SELECT _FUNC_('S', 'SSparkSQLS');\n 'parkSQL'\n" +
+ "> SELECT _FUNC_(BOTH 'S' FROM 'SSparkSQLS');\n 'parkSQL'\n" +
+ "> SELECT _FUNC_(LEADING 'S' FROM 'SSparkSQLS');\n 
'parkSQLS'\n" +
+ "> SELECT _FUNC_(TRAILING 'S' FROM 'SSparkSQLS');\n 
'SSparkSQL'")
+case class StringTrim(children: Seq[Expression])
+  extends Expression with ImplicitCastInputTypes {
+
+  require (children.size <= 2 && children.nonEmpty,
+"$prettyName requires at least one argument and no more than two.")
+
+  override def dataType: DataType = StringType
+  override def inputTypes: Seq[AbstractDataType] = 
Seq.fill(children.size)(StringType)
 
-  def convert(v: UTF8String): UTF8String = v.trim()
+  override def nullable: Boolean = children.exists(_.nullable)
+  override def foldable: Boolean = children.forall(_.foldable)
 
   override def prettyName: String = "trim"
 
+  override def eval(input: InternalRow): Any = {
+val inputs = children.map(_.eval(input).asInstanceOf[UTF8String])
+if (inputs(0) != null) {
+  if (children.size == 1) {
+return inputs(0).trim()
+  } else if (inputs(1) != null) {
+if (inputs(0).numChars > 1) {
+  throw new AnalysisException(s"Trim character '${inputs(0)}' can 
not be greater than " +
+s"1 character.")
+} else {
+  return inputs(1).trim(inputs(0))
+}
+  }
+}
+null
+  }
+
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-defineCodeGen(ctx, ev, c => s"($c).trim()")
+if (children.size == 2 &&
+   (! children(0).isInstanceOf[Literal] || children(0).toString.length 
> 1)) {
+  throw new AnalysisException(s"The trimming parameter should be 
Literal " +
+s"and only one character.") }
+
+val evals = children.map(_.genCode(ctx))
+val inputs = evals.map { eval =>
+  s"${eval.isNull} ? null : ${eval.value}"
+}
+val getTrimFunction = if (children.size == 1) {
+  s"""UTF8String ${ev.value} = ${inputs(0)}.trim();"""
+} else {
+  s"""UTF8String ${ev.value} = 
${inputs(1)}.trim(${inputs(0)});""".stripMargin
+}
+ev.copy(evals.map(_.code).mkString("\n") +
+s"""
+boolean ${ev.isNull} = false;
+${getTrimFunction};
+if (${ev.value} == null) {
+  ${ev.isNull} = true;
+}
+""")
+  }
+
+  override def sql: String = {
+if (children.size == 1) {
+  val childrenSQL = children.map(_.sql).mkString(", ")
+  s"$prettyName($childrenSQL)"
+} else {
+  val trimSQL = children(0).map(_.sql).mkString(", ")
+  val tarSQL = children(1).map(_.sql).mkString(", ")
+  s"$prettyName($trimSQL, $tarSQL)"
+}
   }
 }
 
 /**
- * A function that trim the spaces from left end for given string.
+ * A function that trim the spaces or a character from left end for given 
string.
  */
 @ExpressionDescription(
   usage = "_FUNC_(str) - Removes the leading space characters from str.",
-  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL   '")
-case class StringTrimLeft(child: Expression)
-  extends UnaryExpression with String2StringExpression {
+  extended = "> SELECT _FUNC_('SparkSQL   ');\n 'SparkSQL   '\n" +
  

[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...

2016-08-31 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/12646#discussion_r76962244
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -501,6 +578,38 @@ public UTF8String trimRight() {
 }
   }
 
+  /**
+   * Removes all specified trim character from the ending of a string
+   * @param trimChar the trim character
+   */
+  public UTF8String trimRight(UTF8String trimChar) {
+int numTrimBytes = trimChar.numBytes;
+if (numTrimBytes == 0) {
+  return this;
+}
+int e = this.numBytes - numTrimBytes;
+// skip all the consecutive matching character in the right side
+// index 'e' points to first no matching byte position in the input 
string from right side.
+// Index 'e' moves the number of bytes of the trimming character first.
+if (e < 0) {
+  e = this.numBytes - 1;
+} else {
+  while (e >= 0 && e == this.rfind(trimChar, e)) {
+e -= numTrimBytes;
+  }
+  if (e >= 0) {
+e += numTrimBytes - 1;
+  }
+}
+
+if (e < 0) {
+  // empty string
+  return UTF8String.fromBytes(new byte[0]);
--- End diff --

`UTF8String.EMPTY_UTF8`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...

2016-08-31 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/12646#discussion_r76962088
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -488,6 +543,28 @@ public UTF8String trimLeft() {
 }
   }
 
+  /**
+   * Removes all specified trim character from the beginning of a string
+   * @param trimChar the trim character
+   */
+  public UTF8String trimLeft(UTF8String trimChar) {
+int numTrimBytes = trimChar.numBytes;
+if (numTrimBytes == 0) {
+  return this;
+}
+int s = 0;
+// skip all the consecutive matching character in the left side
+while(s < this.numBytes && s == this.find(trimChar, s)) {
+  s += numTrimBytes;
+}
+if (s == this.numBytes) {
+  // empty string
+  return UTF8String.fromBytes(new byte[0]);
--- End diff --

`UTF8String.EMPTY_UTF8` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...

2016-08-31 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/12646#discussion_r76961869
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -488,6 +543,28 @@ public UTF8String trimLeft() {
 }
   }
 
+  /**
+   * Removes all specified trim character from the beginning of a string
+   * @param trimChar the trim character
+   */
+  public UTF8String trimLeft(UTF8String trimChar) {
+int numTrimBytes = trimChar.numBytes;
+if (numTrimBytes == 0) {
+  return this;
+}
+int s = 0;
+// skip all the consecutive matching character in the left side
+while(s < this.numBytes && s == this.find(trimChar, s)) {
--- End diff --

Oh, sorry, my bad, after checking the `find` function, it's should not be 
the problem here, please ignore my previous comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12646: [SPARK-14878][SQL] Trim characters string functio...

2016-08-31 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/12646#discussion_r76961503
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -488,6 +543,28 @@ public UTF8String trimLeft() {
 }
   }
 
+  /**
+   * Removes all specified trim character from the beginning of a string
+   * @param trimChar the trim character
+   */
+  public UTF8String trimLeft(UTF8String trimChar) {
+int numTrimBytes = trimChar.numBytes;
+if (numTrimBytes == 0) {
+  return this;
+}
+int s = 0;
+// skip all the consecutive matching character in the left side
+while(s < this.numBytes && s == this.find(trimChar, s)) {
--- End diff --

`this.find(trimChar, s)` in a loop probably causes performance issue. I 
will suggest to re-implement it other than call the existing function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14481: [WIP][SPARK-16844][SQL] Generate code for sort based agg...

2016-08-17 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/14481
  
@yucai can you please rebase the code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...

2016-07-25 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/13585#discussion_r72184495
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper {
   .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = 
a.isGenerated)).getOrElse(a)
 }
   }
+
+  /**
+   * Drop the non-partition key expression from the given expression, to 
optimize the
+   * partition pruning. For instances: (We assume part1 & part2 are the 
partition keys):
+   * (part1 == 1 and a > 3) or (part2 == 2 and a < 5)  ==> (part1 == 1 or 
part1 == 2)
+   * (part1 == 1 and a > 3) or (a < 100) => None
+   * (a > 100 && b < 100) or (part1 = 10) => None
+   * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or 
part1 == 2)
+   * @param predicate The given expression
+   * @param partitionKeyIds partition keys in attribute set
+   * @return
+   */
+  def extractPartitionKeyExpression(
+predicate: Expression, partitionKeyIds: AttributeSet): 
Option[Expression] = {
+// drop the non-partition key expression in conjunction of the 
expression tree
+val additionalPartPredicate = predicate transformUp {
--- End diff --

I can keep updating the code if we are agreed for approach, otherwise, I 
think we'd better close this PR for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...

2016-07-25 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/13585#discussion_r72184424
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper {
   .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = 
a.isGenerated)).getOrElse(a)
 }
   }
+
+  /**
+   * Drop the non-partition key expression from the given expression, to 
optimize the
+   * partition pruning. For instances: (We assume part1 & part2 are the 
partition keys):
+   * (part1 == 1 and a > 3) or (part2 == 2 and a < 5)  ==> (part1 == 1 or 
part1 == 2)
+   * (part1 == 1 and a > 3) or (a < 100) => None
+   * (a > 100 && b < 100) or (part1 = 10) => None
+   * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or 
part1 == 2)
+   * @param predicate The given expression
+   * @param partitionKeyIds partition keys in attribute set
+   * @return
+   */
+  def extractPartitionKeyExpression(
+predicate: Expression, partitionKeyIds: AttributeSet): 
Option[Expression] = {
+// drop the non-partition key expression in conjunction of the 
expression tree
+val additionalPartPredicate = predicate transformUp {
--- End diff --

This PR may have critical bugs, when user implements a UDF which logically 
like the `NOT` operator in the partition filter expression. Probably we need a 
white list the built-in UDFs.

@yhuai @liancheng @yangw1234 @clockfly any comments on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14169: [SPARK-16515][SQL]set default record reader and w...

2016-07-17 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/14169#discussion_r71085323
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -1329,7 +1329,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
 
 // SPARK-10310: Special cases LazySimpleSerDe
 val recordHandler = if (name == 
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
-  Try(conf.getConfString(configKey)).toOption
--- End diff --

The default value is different for different `key`, you mean to inline the 
`defaultRecordHandler` function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14169: [SPARK-16515][SQL]set default record reader and writer f...

2016-07-14 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/14169
  
HiveConf provides default value 
`org.apache.hadoop.hive.ql.exec.TextRecordReader`, 
`org.apache.hadoop.hive.ql.exec.TextRecordWriter` for keys 
`hive.script.recordreader` and `hive.script.recordwriter` respectively; 
however, SQLConf doesn't provides those keys, and it means the default values 
will be null; this causes the backward-incompatibility;



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14169: [SPARK-16515][SQL]set default record reader and writer f...

2016-07-14 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/14169
  
LGTM.

cc @yhuai @liancheng 
This breaks the existed application which using the default delimiter, and 
we've already verified in TPCx-BB.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13542: [SPARK-15730][SQL] Respect the --hiveconf in the spark-s...

2016-06-30 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/13542
  
@yhuai I couldn't find any piece of code to copy the `HiveConf`(from 
SessionState) to `SqlConf`? Can you confirm this?

Probably that's the reason why --hiveconf doesn't work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13542: [SPARK-15730][SQL] Respect the --hiveconf in the spark-s...

2016-06-14 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/13542
  
Thanks @jameszhouyi , I've removed the `WIP` from the title.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13542: [SPARK-15730][SQL] Respect the --hiveconf in the ...

2016-06-14 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/13542#discussion_r67083155
  
--- Diff: 
sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
 ---
@@ -91,6 +91,8 @@ class CliSuite extends SparkFunSuite with 
BeforeAndAfterAll with Logging {
  |  --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl
  |  --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
  |  --hiveconf ${ConfVars.SCRATCHDIR}=$scratchDirPath
+ |  --hiveconf conf1=conftest
+ |  --hiveconf conf2=1
--- End diff --

@yhuai any concern for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...

2016-06-13 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/13585#discussion_r66743318
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -65,15 +65,20 @@ private[hive] trait HiveStrategies {
 // hive table scan operator to be used for partition pruning.
 val partitionKeyIds = AttributeSet(relation.partitionKeys)
 val (pruningPredicates, otherPredicates) = predicates.partition { 
predicate =>
-  !predicate.references.isEmpty &&
+  predicate.references.nonEmpty &&
   predicate.references.subsetOf(partitionKeyIds)
 }
+val additionalPartPredicates =
+  PhysicalOperation.partitionPrunningFromDisjunction(
+otherPredicates.foldLeft[Expression](Literal(true))(And(_, 
_)), partitionKeyIds)
 
 pruneFilterProject(
   projectList,
   otherPredicates,
   identity[Seq[Expression]],
-  HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) 
:: Nil
+HiveTableScanExec(_,
+relation,
+pruningPredicates ++ additionalPartPredicates)(sparkSession)) 
:: Nil
--- End diff --

Thanks @clockfly to point the exception also. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...

2016-06-13 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/13585#discussion_r66743131
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -65,15 +65,20 @@ private[hive] trait HiveStrategies {
 // hive table scan operator to be used for partition pruning.
 val partitionKeyIds = AttributeSet(relation.partitionKeys)
 val (pruningPredicates, otherPredicates) = predicates.partition { 
predicate =>
-  !predicate.references.isEmpty &&
+  predicate.references.nonEmpty &&
   predicate.references.subsetOf(partitionKeyIds)
 }
+val additionalPartPredicates =
+  PhysicalOperation.partitionPrunningFromDisjunction(
+otherPredicates.foldLeft[Expression](Literal(true))(And(_, 
_)), partitionKeyIds)
 
 pruneFilterProject(
   projectList,
   otherPredicates,
   identity[Seq[Expression]],
-  HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) 
:: Nil
+HiveTableScanExec(_,
+relation,
+pruningPredicates ++ additionalPartPredicates)(sparkSession)) 
:: Nil
--- End diff --

Thanks @yangw1234 , I will update the code to be more strict for the 
partition pruning filter extraction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...

2016-06-13 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/13585#discussion_r66742892
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper {
   .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = 
a.isGenerated)).getOrElse(a)
 }
   }
+
+  /**
+   * Drop the non-partition key expression in the disjunctions, to 
optimize the partition pruning.
+   * For instances: (We assume part1 & part2 are the partition keys)
+   * (part1 == 1 and a > 3) or (part2 == 2 and a < 5)  ==> (part1 == 1 or 
part1 == 2)
+   * (part1 == 1 and a > 3) or (a < 100) => None
+   * (a > 100 && b < 100) or (part1 = 10) => None
+   * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or 
part1 == 2)
+   * @param predicate disjunctions
+   * @param partitionKeyIds partition keys in attribute set
+   * @return
+   */
+  def partitionPrunningFromDisjunction(
+predicate: Expression, partitionKeyIds: AttributeSet): 
Option[Expression] = {
+// ignore the pure non-partition key expression in conjunction of the 
expression tree
+val additionalPartPredicate = predicate transformUp {
+  case a @ And(left, right) if a.deterministic &&
+left.references.intersect(partitionKeyIds).isEmpty => right
+  case a @ And(left, right) if a.deterministic &&
+right.references.intersect(partitionKeyIds).isEmpty => left
--- End diff --

OK, I got it, thanks for the explanation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...

2016-06-12 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/13585#discussion_r66741771
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -65,15 +65,20 @@ private[hive] trait HiveStrategies {
 // hive table scan operator to be used for partition pruning.
 val partitionKeyIds = AttributeSet(relation.partitionKeys)
 val (pruningPredicates, otherPredicates) = predicates.partition { 
predicate =>
-  !predicate.references.isEmpty &&
+  predicate.references.nonEmpty &&
   predicate.references.subsetOf(partitionKeyIds)
 }
+val additionalPartPredicates =
+  PhysicalOperation.partitionPrunningFromDisjunction(
+otherPredicates.foldLeft[Expression](Literal(true))(And(_, 
_)), partitionKeyIds)
 
 pruneFilterProject(
   projectList,
   otherPredicates,
   identity[Seq[Expression]],
-  HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) 
:: Nil
+HiveTableScanExec(_,
+relation,
+pruningPredicates ++ additionalPartPredicates)(sparkSession)) 
:: Nil
--- End diff --

Sorry, @clockfly I am not so sure your mean, this PR is not designed to 
depends on the Optimizer (CNF), can you please give more concrete example if 
there is a bug?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...

2016-06-12 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/13585#discussion_r66733226
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -65,15 +65,20 @@ private[hive] trait HiveStrategies {
 // hive table scan operator to be used for partition pruning.
 val partitionKeyIds = AttributeSet(relation.partitionKeys)
 val (pruningPredicates, otherPredicates) = predicates.partition { 
predicate =>
-  !predicate.references.isEmpty &&
+  predicate.references.nonEmpty &&
   predicate.references.subsetOf(partitionKeyIds)
 }
+val additionalPartPredicates =
+  PhysicalOperation.partitionPrunningFromDisjunction(
+otherPredicates.foldLeft[Expression](Literal(true))(And(_, 
_)), partitionKeyIds)
 
 pruneFilterProject(
   projectList,
   otherPredicates,
   identity[Seq[Expression]],
-  HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) 
:: Nil
+HiveTableScanExec(_,
+relation,
+pruningPredicates ++ additionalPartPredicates)(sparkSession)) 
:: Nil
--- End diff --

@yangw1234 @liancheng @clockfly 
`pruningPredicates ++ additionalPartPredicates` is the partition filter, 
and, the original filter still need to be applied after the partition pruned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13585: [SPARK-15859][SQL] Optimize the partition pruning within...

2016-06-12 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/13585
  
Updated with more meaningful function name and add more unit test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13585: [SPARK-15859][SQL] Optimize the partition pruning within...

2016-06-12 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/13585
  
cc @liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13542: [SPARK-15730][SQL][WIP] Respect the --hiveconf in...

2016-06-12 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/13542#discussion_r66731077
  
--- Diff: 
sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
 ---
@@ -91,6 +91,8 @@ class CliSuite extends SparkFunSuite with 
BeforeAndAfterAll with Logging {
  |  --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl
  |  --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
  |  --hiveconf ${ConfVars.SCRATCHDIR}=$scratchDirPath
+ |  --hiveconf conf1=conftest
+ |  --hiveconf conf2=1
--- End diff --

yes, it works, that's intention, right?

But seems the below code in `SparkSQLCliDriver` will not work as we 
expected.
```scala
  if (key != "javax.jdo.option.ConnectionURL") {
conf.set(key, value)
sessionState.getOverriddenConfigurations.put(key, value)
  }
```

Why do we have to ignore the connection url? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13530: [SPARK-14279][BUILD] Pick the spark version from pom

2016-06-12 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/13530
  
`spark-version-info.properties` cannot be found in my develop machine, and 
will cause NPE while debugging with IDE, should we add the default version info 
back for developers like me?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...

2016-06-11 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/13585#discussion_r66714698
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper {
   .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = 
a.isGenerated)).getOrElse(a)
 }
   }
+
+  /**
+   * Drop the non-partition key expression in the disjunctions, to 
optimize the partition pruning.
--- End diff --

Oh, OK, originally, I think the conjunction cases was handled in 
`collectProjectsAndFilters` already, before being passed into this function, 
and here, we only handle the `AND` in the disjunction. (You can see this in 
HiveTableScans in HiveStrategies.scala)

Anyway, you convinced me. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...

2016-06-11 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/13585#discussion_r66714358
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -65,15 +65,20 @@ private[hive] trait HiveStrategies {
 // hive table scan operator to be used for partition pruning.
 val partitionKeyIds = AttributeSet(relation.partitionKeys)
 val (pruningPredicates, otherPredicates) = predicates.partition { 
predicate =>
-  !predicate.references.isEmpty &&
+  predicate.references.nonEmpty &&
   predicate.references.subsetOf(partitionKeyIds)
 }
+val additionalPartPredicates =
+  PhysicalOperation.partitionPrunningFromDisjunction(
+otherPredicates.foldLeft[Expression](Literal(true))(And(_, 
_)), partitionKeyIds)
 
 pruneFilterProject(
   projectList,
   otherPredicates,
   identity[Seq[Expression]],
-  HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) 
:: Nil
+HiveTableScanExec(_,
+relation,
+pruningPredicates ++ additionalPartPredicates)(sparkSession)) 
:: Nil
--- End diff --

For `HiveTableScan`, the predicate here just to minimize the partition 
scanning, so what we need to do is to put a more specific partition pruning 
predicate.

Sorry if there is something confused.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...

2016-06-11 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/13585#discussion_r66714324
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper {
   .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = 
a.isGenerated)).getOrElse(a)
 }
   }
+
+  /**
+   * Drop the non-partition key expression in the disjunctions, to 
optimize the partition pruning.
--- End diff --

I think it's should be `disjunction`. for example:

`(part=1 and a=1) or (part = 2 and a=4)`, this should be disjunction, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...

2016-06-11 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/13585#discussion_r66714314
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala ---
@@ -65,4 +69,95 @@ class QueryPartitionSuite extends QueryTest with 
SQLTestUtils with TestHiveSingl
   sql("DROP TABLE IF EXISTS createAndInsertTest")
 }
   }
+
+  test("partition pruning in disjunction") {
+withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "true")) {
+  val testData = sparkContext.parallelize(
+(1 to 10).map(i => TestData(i, i.toString))).toDF()
+  testData.registerTempTable("testData")
+
+  val testData2 = sparkContext.parallelize(
+(11 to 20).map(i => TestData(i, i.toString))).toDF()
+  testData2.registerTempTable("testData2")
+
+  val testData3 = sparkContext.parallelize(
+(21 to 30).map(i => TestData(i, i.toString))).toDF()
+  testData3.registerTempTable("testData3")
+
+  val testData4 = sparkContext.parallelize(
+(31 to 40).map(i => TestData(i, i.toString))).toDF()
+  testData4.registerTempTable("testData4")
+
+  val tmpDir = Files.createTempDir()
+  // create the table for test
+  sql(s"CREATE TABLE table_with_partition(key int,value string) " +
+s"PARTITIONED by (ds string, ds2 string) location 
'${tmpDir.toURI.toString}' ")
+  sql("INSERT OVERWRITE TABLE table_with_partition  partition (ds='1', 
ds2='d1') " +
+"SELECT key,value FROM testData")
+  sql("INSERT OVERWRITE TABLE table_with_partition  partition (ds='2', 
ds2='d1') " +
+"SELECT key,value FROM testData2")
+  sql("INSERT OVERWRITE TABLE table_with_partition  partition (ds='3', 
ds2='d3') " +
+"SELECT key,value FROM testData3")
+  sql("INSERT OVERWRITE TABLE table_with_partition  partition (ds='4', 
ds2='d4') " +
+"SELECT key,value FROM testData4")
+
+  checkAnswer(sql("select key,value from table_with_partition"),
+testData.collect ++ testData2.collect ++ testData3.collect ++ 
testData4.collect)
+
+  checkAnswer(
+sql(
+  """select key,value from table_with_partition
+| where (ds='4' and key=38) or (ds='3' and 
key=22)""".stripMargin),
+  Row(38, "38") :: Row(22, "22") :: Nil)
+
+  checkAnswer(
+sql(
+  """select key,value from table_with_partition
+| where (key<40 and key>38) or (ds='3' and 
key=22)""".stripMargin),
+Row(39, "39") :: Row(22, "22") :: Nil)
+
+  sql("DROP TABLE table_with_partition")
+  sql("DROP TABLE createAndInsertTest")
--- End diff --

good catch. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13585: [SPARK-15859][SQL] Optimize the partition pruning within...

2016-06-11 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/13585
  
Thank you all for the review, but I am not going to solve the CNF, the 
intention of this PR is to exact more partition pruning expression, so we will 
get have less partition to scan during the table scanning.

But I did find some bug in this PR, will add more unit test soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...

2016-06-11 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/13585#discussion_r66714297
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper {
   .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = 
a.isGenerated)).getOrElse(a)
 }
   }
+
+  /**
+   * Drop the non-partition key expression in the disjunctions, to 
optimize the partition pruning.
+   * For instances: (We assume part1 & part2 are the partition keys)
+   * (part1 == 1 and a > 3) or (part2 == 2 and a < 5)  ==> (part1 == 1 or 
part1 == 2)
+   * (part1 == 1 and a > 3) or (a < 100) => None
+   * (a > 100 && b < 100) or (part1 = 10) => None
+   * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or 
part1 == 2)
+   * @param predicate disjunctions
+   * @param partitionKeyIds partition keys in attribute set
+   * @return
+   */
+  def partitionPrunningFromDisjunction(
+predicate: Expression, partitionKeyIds: AttributeSet): 
Option[Expression] = {
+// ignore the pure non-partition key expression in conjunction of the 
expression tree
+val additionalPartPredicate = predicate transformUp {
+  case a @ And(left, right) if a.deterministic &&
+left.references.intersect(partitionKeyIds).isEmpty => right
+  case a @ And(left, right) if a.deterministic &&
+right.references.intersect(partitionKeyIds).isEmpty => left
--- End diff --

Actually, the output of `!(partition = 1 && a > 3)` should be 
`!(partition=1)`, what should be dropped here is the `a>3`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...

2016-06-09 Thread chenghao-intel
GitHub user chenghao-intel opened a pull request:

https://github.com/apache/spark/pull/13585

[SPARK-15859][SQL] Optimize the partition pruning within the disjunction

## What changes were proposed in this pull request?
In disjunction, the partition pruning expression can simply ignore the 
non-partitioned expression if it appears in the junction.

For instance:
```scala
(part1 == 1 and a > 3) or (part2 == 2 and a < 5)  ==> (part1 == 1 or part1 
== 2)
(part1 == 1 and a > 3) or (a < 100) => None
(a > 100 && b < 100) or (part1 = 10) => None
(a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 
== 2)
```

This PR will only works for the HiveTableScan, will submit another PR to 
optimize the data source API back-end scan.

## How was this patch tested?
The unit test is also included in this PR.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chenghao-intel/spark partition

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/13585.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #13585


commit 08519f2e7a3222cb791e6ce1b8af0c132ff16b29
Author: Cheng Hao <hao.ch...@intel.com>
Date:   2016-06-08T08:48:52Z

optimize the partition pruning




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13542: [SPARK-15730][SQL][WIP] Respect the --hiveconf in the sp...

2016-06-09 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/13542
  
Currently, the SparkSQL cli will ignore the configuration passed from 
commandline via `--hiveconf`, this will break lots of existing application, 
it's not by design, isn't it? @yhuai @rxin 

We are still verifying if this PR by running the TPCx-BB, will remove the 
WIP once it passed the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13542: [SPARK-15730][SQL][WIP] Respect the --hiveconf in the sp...

2016-06-09 Thread chenghao-intel
Github user chenghao-intel commented on the issue:

https://github.com/apache/spark/pull/13542
  
cc @yhuai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13542: [SPARK-15730][SQL][WIP] Respect the --hiveconf in...

2016-06-07 Thread chenghao-intel
GitHub user chenghao-intel opened a pull request:

https://github.com/apache/spark/pull/13542

[SPARK-15730][SQL][WIP] Respect the --hiveconf in the spark-sql command line

## What changes were proposed in this pull request?

We should respect the --hiveconf in the spark-sql command line, otherwise, 
the existing applications based on the spark 1.6 and earlier will broke, as the 
configurations specified via --hiveconf are missing

## How was this patch tested?
I've added the unit test, but still need to be verified with real 
application.




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chenghao-intel/spark hiveconf

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/13542.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #13542


commit bbf0ac4da1278722f82abd38b5cc7b0f94f17a60
Author: Cheng Hao <hao.ch...@intel.com>
Date:   2016-06-07T14:55:27Z

respect the --hiveconf in the SparkSQLCliDriver commandline




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15480][UI][Streaming]show missed InputI...

2016-05-23 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/13259#issuecomment-221166631
  
cc @zsxwing 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14631][SQL][WIP]drop database cascade s...

2016-04-14 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/12391#issuecomment-209937183
  
LGTM except some minor comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14631][SQL][WIP]drop database cascade s...

2016-04-14 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/12391#discussion_r59711889
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
 ---
@@ -46,4 +48,23 @@ class HiveExternalCatalogSuite extends CatalogTestCases {
 
   protected override def resetState(): Unit = client.reset()
 
+  import utils._
+
+  test("drop database cascade with function defined") {
+import org.apache.spark.sql.catalyst.expressions.Lower
+
+val catalog = newEmptyCatalog()
+val dbName = "dbCascade"
+val path = newUriForDatabase()
+catalog.createDatabase(CatalogDatabase(dbName, "", path, Map.empty), 
ignoreIfExists = false)
+// create a permanent function in catalog
+catalog.createFunction(dbName, CatalogFunction(
+  FunctionIdentifier("func1", Some(dbName)), Lower.getClass.getName, 
Nil))
--- End diff --

Nit: `Lower.getClass` => `classOf[Lower]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14631][SQL][WIP]drop database cascade s...

2016-04-14 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/12391#discussion_r59711729
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
 ---
@@ -17,20 +17,22 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.File
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader}
-import org.apache.spark.util.Utils
 
 /**
  * Test suite for the [[HiveExternalCatalog]].
  */
 class HiveExternalCatalogSuite extends CatalogTestCases {
 
-  private val client: HiveClient = {
+  private lazy val client: HiveClient = {
--- End diff --

Does `lazy` have to be?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14631][SQL][WIP]drop database cascade s...

2016-04-14 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/12391#discussion_r59711674
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
 ---
@@ -17,20 +17,22 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.File
--- End diff --

Unused import?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12610][SQL] Add Anti join operators

2016-04-06 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/10563#issuecomment-206693926
  
Close this PR due to it's merged in #12214


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12610][SQL] Add Anti join operators

2016-04-06 Thread chenghao-intel
Github user chenghao-intel closed the pull request at:

https://github.com/apache/spark/pull/10563


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12196][Core] Store/retrieve blocks in d...

2016-04-05 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/10225#issuecomment-205712538
  
@JoshRosen I am not sure if this still be part of your refactorings, or can 
we bring up this PR? This PR is quite critical performance improvement when 
mixed PCI-E SSD / HDD, particularly for the large mount of data shuffling 
scenario.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14021][SQL] custom context support for ...

2016-03-28 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/11843#issuecomment-202395548
  
cc @yhuai , this is critical for our own customized `HiveContext`, can you 
please merge this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14021][SQL] custom context support for ...

2016-03-24 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/11843#issuecomment-200900425
  
cc @rxin @liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14021][SQL][WIP] custom context support...

2016-03-23 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/11843#discussion_r57120180
  
--- Diff: 
sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
 ---
@@ -34,6 +34,20 @@ private[hive] object SparkSQLEnv extends Logging {
   var hiveContext: HiveContext = _
   var sparkContext: SparkContext = _
 
+  private def readContextClassFromConf(sparkConf: SparkConf): Class[_ <: 
HiveContext] = {
+val className =
+  sparkConf.get("spark.sql.context.class", 
"org.apache.spark.sql.hive.HiveContext")
--- End diff --

instead of hard code, can we make it as `classOf[HiveContext].getName()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13889][YARN] Fix integer overflow when ...

2016-03-15 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/11713#issuecomment-197131091
  
cc @rxin @JoshRosen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13895][SQL]Change the return type of Da...

2016-03-15 Thread chenghao-intel
GitHub user chenghao-intel opened a pull request:

https://github.com/apache/spark/pull/11731

[SPARK-13895][SQL]Change the return type of DataFrameReader.text

## What changes were proposed in this pull request?
Change the return type of `DataFrameReader.text` from `DataFrame` to 
`Dataset[String]`

## How was this patch tested?
No additional unit test required.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chenghao-intel/spark dfreader

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/11731.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #11731


commit be163d6b353fde70565ff962bd3dea391f932120
Author: Cheng Hao <hao.ch...@intel.com>
Date:   2016-03-15T13:58:04Z

change the return type of DataFrameReader.text from DataFrame to 
Dataset[String]




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13889][YARN] Fix integer overflow when ...

2016-03-15 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/11713#issuecomment-196816685
  
BTW, @carsonwang can you also describe without this change, what would 
happen to those applications with dynamic allocation enabled? This will helps 
people to understand the impact of this bug fixing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13894][SQL] SqlContext.range return typ...

2016-03-15 Thread chenghao-intel
GitHub user chenghao-intel opened a pull request:

https://github.com/apache/spark/pull/11730

[SPARK-13894][SQL] SqlContext.range return type from DataFrame to DataSet

## What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-13894
Change the return type of the `range` API from `DataFrame` to `Dataset`.

## How was this patch tested?
No additional unit test required.




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chenghao-intel/spark range

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/11730.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #11730


commit b520eb576f6e49b4c5abe53d1ec7b0800d4a79f9
Author: Cheng Hao <hao.ch...@intel.com>
Date:   2016-03-15T12:26:02Z

SqlContext.range return type from DataFrame to DataSet




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13889][YARN] Fix integer overflow when ...

2016-03-15 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/11713#discussion_r56130697
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
@@ -73,7 +73,8 @@ private[spark] class ApplicationMaster(
   } else {
 sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0)
   }
-val defaultMaxNumExecutorFailures = math.max(3, 2 * 
effectiveNumExecutors)
+val defaultMaxNumExecutorFailures = math.max(3,
--- End diff --

Can you add a comment here? Says `effectiveNumExecutors` is `Int.MaxValue` 
with dynamic allocation enabled  by default?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11517][SQL]Calc partitions in parallel ...

2016-02-26 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/9483#issuecomment-189470296
  
LGTM except some minor suggestions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11517][SQL]Calc partitions in parallel ...

2016-02-26 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/9483#discussion_r54296531
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
 ---
@@ -89,4 +89,25 @@ class HiveTableScanSuite extends HiveComparisonTest {
 assert(sql("select CaseSensitiveColName from spark_4959_2").head() === 
Row("hi"))
 assert(sql("select casesensitivecolname from spark_4959_2").head() === 
Row("hi"))
   }
+
--- End diff --

remove the extra empty line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11517][SQL]Calc partitions in parallel ...

2016-02-26 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/9483#discussion_r54296448
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.util.concurrent.Callable
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Partition, SparkContext}
+import org.apache.spark.rdd.{RDD, UnionPartition, UnionRDD}
+import org.apache.spark.util.ThreadUtils
+
+object ParallelUnionRDD {
--- End diff --

`private[hive]` or move it into the upper level package? The same for the 
class `ParallelUnionRDD`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11517][SQL]Calc partitions in parallel ...

2016-02-26 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/9483#discussion_r54296499
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/ParallelUnionRDD.scala ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.util.concurrent.Callable
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Partition, SparkContext}
+import org.apache.spark.rdd.{RDD, UnionPartition, UnionRDD}
+import org.apache.spark.util.ThreadUtils
+
+object ParallelUnionRDD {
+  lazy val executorService = ThreadUtils.newDaemonFixedThreadPool(16, 
"ParallelUnionRDD")
+}
+
+class ParallelUnionRDD[T: ClassTag](
+  sc: SparkContext,
+  rdds: Seq[RDD[T]]) extends UnionRDD[T](sc, rdds){
+
+  override def getPartitions: Array[Partition] = {
+// Calc partitions field for each RDD in parallel.
+val rddPartitions = rdds.map {rdd =>
+  (rdd, ParallelUnionRDD.executorService.submit(new 
Callable[Array[Partition]] {
+override def call(): Array[Partition] = rdd.partitions
+  }))
+}.map {case(r, f) => (r, f.get())}
--- End diff --

space before `}` and after `{`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...

2016-02-25 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/11249#discussion_r54170956
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/AccumulableCheckpoint.scala 
---
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.apache.spark.{Accumulable, AccumulableParam, SparkException}
+
+/**
+  * Store information of Accumulable. We can't checkpoint Accumulable 
dircectly because the
+  * "readObject" method of Accumulable to add extra logic.
+  */
+class AccumulableCheckpoint[R, T] private (
+  val name: String,
+  val value: R,
+  val param: AccumulableParam[R, T]) extends Serializable{
--- End diff --

Space before `{`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...

2016-02-25 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/11249#discussion_r54170867
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
@@ -269,6 +270,33 @@ class StreamingContext private[streaming] (
 RDDOperationScope.withScope(sc, name, allowNesting = false, 
ignoreParent = false)(body)
   }
 
+  private[streaming] val recoverableAccuNameToAcc: mutable.Map[String, 
Accumulable[_, _]] =
+mutable.Map.empty
+
+  /**
+* Different from accumulator in SparkContext, it will first try to 
recover from Checkpoint
+* if it exist.
+*
+* @param initialValue   initial value of accumulator. It will be 
ignored when recovering from
--- End diff --

We don't do alignment for scala doc in Spark.
```scala
@param initialValue Initial value of accumulator. It will be ignored when 
recovering from..
@param name The name is required as identity to find corresponding 
accumulator.
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...

2016-02-25 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/11249#discussion_r54170612
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
@@ -269,6 +270,33 @@ class StreamingContext private[streaming] (
 RDDOperationScope.withScope(sc, name, allowNesting = false, 
ignoreParent = false)(body)
   }
 
+  private[streaming] val recoverableAccuNameToAcc: mutable.Map[String, 
Accumulable[_, _]] =
+mutable.Map.empty
+
+  /**
+* Different from accumulator in SparkContext, it will first try to 
recover from Checkpoint
+* if it exist.
+*
+* @param initialValue   initial value of accumulator. It will be 
ignored when recovering from
+*   checkpoint
+* @param name   name is required as identity to find 
corresponding accumulator.
+*/
+  def getOrCreateRecoverableAccumulator[T](initialValue: T, name: String)
+(implicit param: AccumulatorParam[T]): Accumulator[T] = {
+
+def registerNewAccumulator(_initialV: T) : Accumulator[T] = {
+  val acc = sc.accumulator(_initialV, name)
+  recoverableAccuNameToAcc(name) = acc
+  acc
+}
+
+val newInitialValue: T = if (isCheckpointPresent) {
+  _cp.trackedAccs.find(_.name == 
name).map(_.value).getOrElse(initialValue).asInstanceOf[T]
+} else initialValue
--- End diff --

Nit:
```scala
if (...) {
   ...
} else {
   ...
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13222][Streaming][WIP]make sure latest ...

2016-02-25 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/11101#discussion_r54167619
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 ---
@@ -123,6 +126,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
 timedOut
   }
 
+  // generate one more batch to make sure RDD in lastJob is 
checkpointed. As an performance
+  // optimization, if the latest info has been checkpointed in last 
batch, there is no need
+  // to run another round. "isCheckpointMissedLastTime" method here is 
in charge of collect
+  // such information from every DStream recursively.
+  if (!jobScheduler.receiverTracker.hasUnallocatedBlocks &&
+ssc.graph.isCheckpointMissedLastTime) {
+Thread.sleep(ssc.graph.batchDuration.milliseconds)
+  }
--- End diff --

I just wonder if we can add a double check if the last mini batch finish, 
otherwise, at least we can add a warning log.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...

2016-02-24 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/11249#discussion_r53925855
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---
@@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val 
checkpointTime: Time)
   val checkpointDuration = ssc.checkpointDuration
   val pendingTimes = ssc.scheduler.getPendingTimes().toArray
   val sparkConfPairs = ssc.conf.getAll
+  @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId
--- End diff --

the exception is about the `ssc` (StreamingContext), probably the property 
of `recoverableAccuNameToId`; in the meantime, the calculation of `val 
trackedAccs = ` should happens in master node, not in every executor, I doubt 
this exception was caused by something else?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...

2016-02-18 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/11249#discussion_r53419256
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
@@ -269,6 +270,39 @@ class StreamingContext private[streaming] (
 RDDOperationScope.withScope(sc, name, allowNesting = false, 
ignoreParent = false)(body)
   }
 
+  private[streaming] val recoverableAccuNameToId: mutable.Map[String, 
Long] = {
--- End diff --

Yes it will add the strong references, that's why I said we need to reset 
the `StreamingContext.recoverableAccuNameToId` in `stop / shutdown` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...

2016-02-18 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/11249#discussion_r53419167
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---
@@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val 
checkpointTime: Time)
   val checkpointDuration = ssc.checkpointDuration
   val pendingTimes = ssc.scheduler.getPendingTimes().toArray
   val sparkConfPairs = ssc.conf.getAll
+  @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId
--- End diff --

Hmmm, I don't think so, `ssc.` is used everywhere in this class. can you 
try it? probably the exception you met was lead by something else.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...

2016-02-18 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/11249#discussion_r53419010
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---
@@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val 
checkpointTime: Time)
   val checkpointDuration = ssc.checkpointDuration
   val pendingTimes = ssc.scheduler.getPendingTimes().toArray
   val sparkConfPairs = ssc.conf.getAll
+  @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId
+
+  // initialize from ssc.context after SPARK-13051
+  val trackedAccs: Array[AccumulableCheckpoint[_, _]] = 
Accumulators.originals.filter(ele =>
--- End diff --

OK, I see, let's see if we can do something inside of 
`Accumulable.readObject`, currently it's more like a hacky way to work around. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...

2016-02-18 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/11249#discussion_r53417873
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
@@ -269,6 +270,39 @@ class StreamingContext private[streaming] (
 RDDOperationScope.withScope(sc, name, allowNesting = false, 
ignoreParent = false)(body)
   }
 
+  private[streaming] val recoverableAccuNameToId: mutable.Map[String, 
Long] = {
--- End diff --

I don't think the memory space is the problem, as in Java collection always 
stores object references, not the real duplicated objects.

The reason I suggested to do that is to simplify the code at 
https://github.com/apache/spark/pull/11249/files#diff-f0064bc8820551c338276e29d922e459R47
 , which mean we don't need to get the Accumulator again by accumulator ids, 
particularly the `Accumulators.originals` stores the `WeakReference` object, it 
can return null.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...

2016-02-18 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/11249#discussion_r53417573
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---
@@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val 
checkpointTime: Time)
   val checkpointDuration = ssc.checkpointDuration
   val pendingTimes = ssc.scheduler.getPendingTimes().toArray
   val sparkConfPairs = ssc.conf.getAll
+  @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId
+
+  // initialize from ssc.context after SPARK-13051
+  val trackedAccs: Array[AccumulableCheckpoint[_, _]] = 
Accumulators.originals.filter(ele =>
--- End diff --

I just check the code of `Accumulable.readObject()`, seems never used. were 
you meeting problem with that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...

2016-02-18 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/11249#discussion_r53417343
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---
@@ -41,6 +41,13 @@ class Checkpoint(ssc: StreamingContext, val 
checkpointTime: Time)
   val checkpointDuration = ssc.checkpointDuration
   val pendingTimes = ssc.scheduler.getPendingTimes().toArray
   val sparkConfPairs = ssc.conf.getAll
+  @transient val recoverableAccuNameToId = ssc.recoverableAccuNameToId
--- End diff --

I am not talking about the `@transient`, I mean we can move this inside of 
https://github.com/apache/spark/pull/11249/files#diff-f0064bc8820551c338276e29d922e459R48


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13222][Streaming][WIP]make sure latest ...

2016-02-18 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/11101#discussion_r53417276
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 ---
@@ -123,6 +126,12 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
 timedOut
   }
 
+  // generate one more bacth to make sure RDD in lastJob is 
checkpointed.
+  if (!jobScheduler.receiverTracker.hasUnallocatedBlocks &&
+ssc.graph.isCheckpointMissedLastTime) {
+Thread.sleep(ssc.graph.batchDuration.milliseconds)
--- End diff --

ok, I was wondering it will be more graceful if this thread can be woke up 
as soon as the batch job finished, other than sleep with a fixed length of time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-13374][Streaming][wip] make it possible...

2016-02-18 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/11249#discussion_r53400290
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
@@ -269,6 +270,39 @@ class StreamingContext private[streaming] (
 RDDOperationScope.withScope(sc, name, allowNesting = false, 
ignoreParent = false)(body)
   }
 
+  private[streaming] val recoverableAccuNameToId: mutable.Map[String, 
Long] = {
--- End diff --

`mutable.Map[String, Accumulator[_,_]]`? and reset the `Map` while shutdown?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >