[GitHub] spark issue #17077: [SPARK-16931][PYTHON][SQL] Add Python wrapper for bucket...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17077
  
**[Test build #73535 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73535/testReport)**
 for PR 17077 at commit 
[`ae93166`](https://github.com/apache/spark/commit/ae93166db34d4b3ee784177972e88eea34d4936e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16557: [SPARK-18693][ML][MLLIB] ML Evaluators should use weight...

2017-02-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16557
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16557: [SPARK-18693][ML][MLLIB] ML Evaluators should use weight...

2017-02-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16557
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73525/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17088: [SPARK-19753][CORE] All shuffle files on a host should b...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17088
  
**[Test build #73533 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73533/testReport)**
 for PR 17088 at commit 
[`74ca88b`](https://github.com/apache/spark/commit/74ca88bc1d2b67cc12ea32a3cd344ec0259500a9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-02-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16867
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73514/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17031: [SPARK-19702][MESOS] Add suppress/revive support to the ...

2017-02-27 Thread skonto
Github user skonto commented on the issue:

https://github.com/apache/spark/pull/17031
  
Ok I see. LGTM. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17087: [SPARK-19372][SQL] Fix throwing a Java exception at df.f...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17087
  
**[Test build #73530 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73530/testReport)**
 for PR 17087 at commit 
[`6f40a93`](https://github.com/apache/spark/commit/6f40a93cfb21597b214f930d3a5bd9c6645ef227).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...

2017-02-27 Thread kayousterhout
Github user kayousterhout commented on the issue:

https://github.com/apache/spark/pull/16867
  
This looks like a real test failure resulting from this change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13143: [SPARK-15359] [Mesos] Mesos dispatcher should handle DRI...

2017-02-27 Thread mgummelt
Github user mgummelt commented on the issue:

https://github.com/apache/spark/pull/13143
  
What whole function is designed poorly.  We need to totally change it 
instead of tacking this on.  We shouldn't be calling `driver.run()` in a 
separate thread.  We should be calling `driver.start()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13932: [SPARK-15354] [CORE] Topology aware block replication st...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/13932
  
**[Test build #73534 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73534/testReport)**
 for PR 13932 at commit 
[`ec601bd`](https://github.com/apache/spark/commit/ec601bd1e619a3f6f35597753954b82536de6bc9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13932: [SPARK-15354] [CORE] Topology aware block replication st...

2017-02-27 Thread shubhamchopra
Github user shubhamchopra commented on the issue:

https://github.com/apache/spark/pull/13932
  
test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17085: [SPARK-18693][ML][MLLIB] ML Evaluators should use weight...

2017-02-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17085
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17072: [MINOR][BUILD] Fix lint-java breaks in Java

2017-02-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17072


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17072: [MINOR][BUILD] Fix lint-java breaks in Java

2017-02-27 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/17072
  
Thank you @srowen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17082: [SPARK-19749][SS] Name socket source with a meani...

2017-02-27 Thread uncleGen
GitHub user uncleGen opened a pull request:

https://github.com/apache/spark/pull/17082

[SPARK-19749][SS] Name socket source with a meaningful name

## What changes were proposed in this pull request?

Name socket source with a meaningful name

## How was this patch tested?

Jenkins


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uncleGen/spark SPARK-19749

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17082.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17082


commit 68349facee3b33fd5975e90c74c882f3d922
Author: uncleGen 
Date:   2017-02-27T09:35:56Z

Name socket source with a meaningful name




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17082: [SPARK-19749][SS] Name socket source with a meaningful n...

2017-02-27 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/17082
  
OK, how about other sources? it'd be nice to given them toString methods 
consistently along with other related classes, if you're bothering to do one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16819: [SPARK-16441][YARN] Set maxNumExecutor depends on yarn c...

2017-02-27 Thread tgravescs
Github user tgravescs commented on the issue:

https://github.com/apache/spark/pull/16819
  
I agree with others, this is not the way to do this. There are different 
schedulers in yarn, each with different configs that could affect the actual 
resources you get. 

If you want to do something like this it should look at the available 
resources after calling the allocate call to yarn 
(allocateResponse.getAvailableResources).  When yarn returns it tells you the 
available resources, which takes into account  the various scheduler things. 

MapReduce refers to that as headroom and uses it to determine things like 
if it needs to kill a reducer to run a map.  We could use this to help with 
dynamic allocation and do more intelligent things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16965: [SPARK-18450][ML] Scala API Change for LSH AND-amplifica...

2017-02-27 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/16965
  
Github isn't handling the merge well, so you might try rebasing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17078: [SPARK-19746][ML] Faster indexing for logistic ag...

2017-02-27 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/17078#discussion_r103342317
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 ---
@@ -1431,7 +1431,12 @@ private class LogisticAggregator(
   private var weightSum = 0.0
   private var lossSum = 0.0
 
-  private val gradientSumArray = Array.fill[Double](coefficientSize)(0.0D)
+  @transient private lazy val coefficientsArray = bcCoefficients.value 
match {
--- End diff --

Can you have the type of `coefficientsArray` so people can clear know that 
it's a primitive array? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103340921
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression if s.children.nonEmpty => true
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(e: Expression): Boolean = {
+e.find{ x =>
+  x.isInstanceOf[Not] && e.find {
+case In(_, Seq(_: ListQuery)) => true
+case _ => false
+  }.isDefined
+}.isDefined
+  }
+
+  /**
+   * Returns an expression after removing the OuterReference shell.
+   */
+  def stripOuterReference(e: Expression): Expression = {
+e.transform {
+  case OuterReference(r) => r
+}
+  }
+
+  /**
+   * Returns the list of expressions after removing the OuterReference 
shell from each of
+   * the expression.
+   */
+  def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = 
e.map(stripOuterReference)
+
+  /**
+   * Returns the logical plan after removing the OuterReference shell from 
all the expressions
+   * of the input logical plan.
+   */
+  def stripOuterReferences(p: LogicalPlan): LogicalPlan = {
+p.transformAllExpressions {
+  case OuterReference(a) => a
+}
+  }
+
+  /**
+   * Given a list of expressions, returns the expressions which have outer 
references. Aggregate
+   * expressions are treated in a special way. If the children of 
aggregate expression contains an
+   * outer reference, then the entire aggregate expression is marked as an 
outer reference.
+   * Example (SQL):
+   * {{{
+   *   SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < 
min(b))
+   * }}}
+   * In the above case, we want to mark the entire min(b) as an outer 
reference
+   * OuterReference(min(b)) instead of min(OuterReference(b)).
+   * TODO: Currently we don't allow deep correlation. Also, we don't allow 
mixing of
+   * outer references and local references under an aggregate expression.
+   * For example (SQL):
+   * {{{
+   *   SELECT .. FROM p1
+   *   WHERE EXISTS (SELECT ...
+   * FROM p2
+   * WHERE EXISTS (SELECT ...
+   *   FROM sq
+   *   WHERE min(p1.a + p2.b) = sq.c))
+   *
+   *   

[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r102167233
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1110,31 +1184,24 @@ class Analyzer(
 }
 
 /**
- * Pull out all (outer) correlated predicates from a given subquery. 
This method removes the
- * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
- * to all intermediate [[Project]] and [[Aggregate]] clauses (if they 
are missing) in order to
- * be able to evaluate the predicates at the top level.
- *
- * This method returns the rewritten subquery and correlated 
predicates.
+ * Validates to make sure the outer references appearing inside the 
subquery
+ * are legal. This function also returns the list of expressions
+ * that contain outer references. These outer references would be kept 
as children
+ * of subquery expressions by the caller of this function.
  */
-private def pullOutCorrelatedPredicates(sub: LogicalPlan): 
(LogicalPlan, Seq[Expression]) = {
-  val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+private def checkAndGetOuterReferences(sub: LogicalPlan): 
Seq[Expression] = {
--- End diff --

Should we move this into `CheckAnalysis`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103339031
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -109,6 +109,26 @@ object TypeCoercion {
   }
 
   /**
+   * This function determines the target type of a comparison operator 
when one operand
+   * is a String and the other is not. It also handles when one op is a 
Date and the
+   * other is a Timestamp by making the target type to be String. 
Currently this is used
+   * to coerce types between LHS and RHS of the IN expression.
+   */
+  val findCommonTypeForBinaryComparison: (DataType, DataType) => 
Option[DataType] = {
+case (StringType, DateType) => Some(StringType)
+case (DateType, StringType) => Some(StringType)
+case (StringType, TimestampType) => Some(StringType)
+case (TimestampType, StringType) => Some(StringType)
+case (TimestampType, DateType) => Some(StringType)
--- End diff --

This seems weird. Is this also the current behavior?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r102167746
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1110,31 +1184,24 @@ class Analyzer(
 }
 
 /**
- * Pull out all (outer) correlated predicates from a given subquery. 
This method removes the
- * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
- * to all intermediate [[Project]] and [[Aggregate]] clauses (if they 
are missing) in order to
- * be able to evaluate the predicates at the top level.
- *
- * This method returns the rewritten subquery and correlated 
predicates.
+ * Validates to make sure the outer references appearing inside the 
subquery
+ * are legal. This function also returns the list of expressions
+ * that contain outer references. These outer references would be kept 
as children
+ * of subquery expressions by the caller of this function.
  */
-private def pullOutCorrelatedPredicates(sub: LogicalPlan): 
(LogicalPlan, Seq[Expression]) = {
-  val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+private def checkAndGetOuterReferences(sub: LogicalPlan): 
Seq[Expression] = {
--- End diff --

Also use `foreachUp` instead of `transformUp` for the tree traversal in 
this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103340272
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 ---
@@ -123,19 +123,36 @@ case class Not(child: Expression)
  */
 @ExpressionDescription(
   usage = "expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals 
to any valN.")
-case class In(value: Expression, list: Seq[Expression]) extends Predicate
-with ImplicitCastInputTypes {
+case class In(value: Expression, list: Seq[Expression]) extends Predicate {
 
   require(list != null, "list should not be null")
-
-  override def inputTypes: Seq[AbstractDataType] = value.dataType +: 
list.map(_.dataType)
-
   override def checkInputDataTypes(): TypeCheckResult = {
-if (list.exists(l => l.dataType != value.dataType)) {
-  TypeCheckResult.TypeCheckFailure(
-"Arguments must be same type")
-} else {
-  TypeCheckResult.TypeCheckSuccess
+list match {
+  case ListQuery(sub, _, _) :: Nil =>
--- End diff --

IIUC this is all done to get a better error message right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103337724
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2512,3 +2522,67 @@ object ResolveCreateNamedStruct extends 
Rule[LogicalPlan] {
   CreateNamedStruct(children.toList)
   }
 }
+
+/**
+ * The aggregate expressions from subquery referencing outer query block 
are pushed
+ * down to the outer query block for evaluation. This rule below updates 
such outer references
+ * as AttributeReference referring attributes from the parent/outer query 
block.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT l.a FROM l GROUP BY 1 HAVING EXISTS (SELECT 1 FROM r WHERE r.d 
< min(l.b))
+ * }}}
+ * Plan before the rule.
+ *Project [a#226]
+ *+- Filter exists#245 [min(b#227)#249]
+ *   :  +- Project [1 AS 1#247]
+ *   : +- Filter (d#238 < min(outer(b#227)))   <-
+ *   :+- SubqueryAlias r
+ *   :   +- Project [_1#234 AS c#237, _2#235 AS d#238]
+ *   :  +- LocalRelation [_1#234, _2#235]
+ *   +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249]
+ *  +- SubqueryAlias l
+ * +- Project [_1#223 AS a#226, _2#224 AS b#227]
+ *+- LocalRelation [_1#223, _2#224]
+ * Plan after the rule.
+ *Project [a#226]
+ *+- Filter exists#245 [min(b#227)#249]
+ *   :  +- Project [1 AS 1#247]
+ *   : +- Filter (d#238 < outer(min(b#227)#249))   <-
+ *   :+- SubqueryAlias r
+ *   :   +- Project [_1#234 AS c#237, _2#235 AS d#238]
+ *   :  +- LocalRelation [_1#234, _2#235]
+ *   +- Aggregate [a#226], [a#226, min(b#227) AS min(b#227)#249]
+ *  +- SubqueryAlias l
+ * +- Project [_1#223 AS a#226, _2#224 AS b#227]
+ *+- LocalRelation [_1#223, _2#224]
+ */
+object UpdateOuterReferences extends Rule[LogicalPlan] {
+  private def stripAlias(expr: Expression): Expression = expr match { case 
a: Alias => a.child }
+
+  private def updateOuterReferenceInSubquery(
+  plan: LogicalPlan,
+  refExprs: Seq[Expression]): LogicalPlan = {
+plan transformAllExpressions { case e =>
+  val outerAlias =
+
refExprs.find(stripAlias(_).semanticEquals(SubExprUtils.stripOuterReference(e)))
+  outerAlias match {
+case Some(a: Alias) => OuterReference(a.toAttribute)
+case _ => e
+  }
+}
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+plan transform {
+  case f @ Filter(_, a: Aggregate) if f.resolved =>
--- End diff --

This only works with aggregates that are already in the `Aggregate` 
operator, this seems like a regression. What does Hive do?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r102165726
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -707,13 +709,85 @@ class Analyzer(
   } transformUp {
 case other => other transformExpressions {
   case a: Attribute =>
-
attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier)
+dedupAttr(a, attributeRewrites)
+  case s: SubqueryExpression =>
+s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, 
attributeRewrites))
 }
   }
   newRight
   }
 }
 
+private def dedupAttr(attr: Attribute, attrMap: 
AttributeMap[Attribute]): Attribute = {
+  attrMap.get(attr).getOrElse(attr).withQualifier(attr.qualifier)
+}
+
+/**
+ * The outer plan may have been de-duplicated and the function below 
updates the
+ * outer references to refer to the de-duplicated attributes.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT * FROM t1
+ *   WHERE EXISTS (SELECT 1
+ * FROM t2
+ * WHERE t1.c1 = t2.c1)
+ *   INTERSECT
+ *   SELECT * FROM t1
+ *   WHERE EXISTS (SELECT 1
+ * FROM t2
+ * WHERE t1.c1 = t2.c1)
+ * }}}
+  * Plan before resolveReference rule.
+ *'Intersect
+ *:- 'Project [*]
+ *:  +- Filter exists#271 [c1#250]
+ *: :  +- Project [1 AS 1#295]
+ *: : +- Filter (outer(c1#250) = c1#263)
+ *: :+- SubqueryAlias t2
+ *: :   +- Relation[c1#263,c2#264] parquet
+ *: +- SubqueryAlias t1
+ *:+- Relation[c1#250,c2#251] parquet
+ *+- 'Project [*]
+ *   +- Filter exists#272 [c1#250]
+ *  :  +- Project [1 AS 1#298]
+ *  : +- Filter (outer(c1#250) = c1#263)
+ *  :+- SubqueryAlias t2
+ *  :   +- Relation[c1#263,c2#264] parquet
+ *  +- SubqueryAlias t1
+ * +- Relation[c1#250,c2#251] parquet
+ * Plan after the resolveReference rule.
+ *Intersect
+ *:- Project [c1#250, c2#251]
+ *:  +- Filter exists#271 [c1#250]
+ *: :  +- Project [1 AS 1#295]
+ *: : +- Filter (outer(c1#250) = c1#263)
+ *: :+- SubqueryAlias t2
+ *: :   +- Relation[c1#263,c2#264] parquet
+ *: +- SubqueryAlias t1
+ *:+- Relation[c1#250,c2#251] parquet
+ *+- Project [c1#299, c2#300]
+ *+- Filter exists#272 [c1#299]
--- End diff --

Nit: spacing is of here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r102256790
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2332,6 +2337,11 @@ class Analyzer(
 override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveExpressions {
   case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>
 e.withTimeZone(conf.sessionLocalTimeZone)
+  // Casts could be added in the subquery plan through the rule 
TypeCoercion while coercing
+  // the types between the value expression and list query expression 
of IN expression.
+  // We need to subject the subquery plan through ResolveTimeZone 
again to setup timezone
+  // information for time zone aware expressions.
+  case e: ListQuery => e.withNewPlan(ResolveTimeZone.apply(e.plan))
--- End diff --

Nit: just use `apply`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103340692
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -40,19 +42,179 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
 /**
  * A base interface for expressions that contain a [[LogicalPlan]].
  */
-abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+abstract class SubqueryExpression(
+plan: LogicalPlan,
+children: Seq[Expression],
+exprId: ExprId) extends PlanExpression[LogicalPlan] {
+
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+if (plan.resolved) super.references -- plan.outputSet else 
super.references
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+  override def semanticEquals(o: Expression): Boolean = o match {
+case p: SubqueryExpression =>
+  this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
+children.length == p.children.length &&
+children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+case _ => false
+  }
 }
 
 object SubqueryExpression {
+  /**
+   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
+   */
+  def hasInOrExistsSubquery(e: Expression): Boolean = {
+e.find {
+  case _: ListQuery | _: Exists => true
+  case _ => false
+}.isDefined
+  }
+
+  /**
+   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
+   * reference attributes are kept as children of subquery expression by
+   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
+   */
   def hasCorrelatedSubquery(e: Expression): Boolean = {
 e.find {
-  case e: SubqueryExpression if e.children.nonEmpty => true
+  case s: SubqueryExpression if s.children.nonEmpty => true
   case _ => false
 }.isDefined
   }
 }
 
+object SubExprUtils extends PredicateHelper {
+  /**
+   * Returns true when an expression contains correlated predicates i.e 
outer references and
+   * returns false otherwise.
+   */
+  def containsOuter(e: Expression): Boolean = {
+e.find(_.isInstanceOf[OuterReference]).isDefined
+  }
+
+  /**
+   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
+   * turn the null-aware predicate into not-null-aware predicate.
+   */
+  def hasNullAwarePredicateWithinNot(e: Expression): Boolean = {
+e.find{ x =>
+  x.isInstanceOf[Not] && e.find {
--- End diff --

I am pretty sure this is wrong. This will work on the following expression: 
`NOT(a) AND b IN(SELECT q FROM t)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103336411
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1398,42 +1399,46 @@ class Analyzer(
 }
   } while (!current.resolved && !current.fastEquals(previous))
 
-  // Step 2: Pull out the predicates if the plan is resolved.
+  // Step 2: pull the outer references and record them as children of 
SubqueryExpression
   if (current.resolved) {
 // Make sure the resolved query has the required number of output 
columns. This is only
 // needed for Scalar and IN subqueries.
 if (requiredColumns > 0 && requiredColumns != current.output.size) 
{
   failAnalysis(s"The number of columns in the subquery 
(${current.output.size}) " +
 s"does not match the required number of columns 
($requiredColumns)")
 }
-// Pullout predicates and construct a new plan.
-f.tupled(rewriteSubQuery(current, plans))
+// Validate the outer reference and record the outer references as 
children of
+// subquery expression.
+f.tupled(current, checkAndGetOuterReferences(current))
--- End diff --

NIT: Do we still need `f.tupled`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r102168672
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1110,31 +1184,24 @@ class Analyzer(
 }
 
 /**
- * Pull out all (outer) correlated predicates from a given subquery. 
This method removes the
- * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
- * to all intermediate [[Project]] and [[Aggregate]] clauses (if they 
are missing) in order to
- * be able to evaluate the predicates at the top level.
- *
- * This method returns the rewritten subquery and correlated 
predicates.
+ * Validates to make sure the outer references appearing inside the 
subquery
+ * are legal. This function also returns the list of expressions
+ * that contain outer references. These outer references would be kept 
as children
+ * of subquery expressions by the caller of this function.
  */
-private def pullOutCorrelatedPredicates(sub: LogicalPlan): 
(LogicalPlan, Seq[Expression]) = {
-  val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+private def checkAndGetOuterReferences(sub: LogicalPlan): 
Seq[Expression] = {
+  val outerReferences = 
scala.collection.mutable.ArrayBuffer.empty[Seq[Expression]]
--- End diff --

Nit: `ArrayBuffer.empty[Seq[Expression]]` should also work. Why is it 
useful to collect sequences of expressions instead of just expressions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r102168299
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -707,13 +709,85 @@ class Analyzer(
   } transformUp {
 case other => other transformExpressions {
   case a: Attribute =>
-
attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier)
+dedupAttr(a, attributeRewrites)
+  case s: SubqueryExpression =>
+s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, 
attributeRewrites))
 }
   }
   newRight
   }
 }
 
+private def dedupAttr(attr: Attribute, attrMap: 
AttributeMap[Attribute]): Attribute = {
+  attrMap.get(attr).getOrElse(attr).withQualifier(attr.qualifier)
+}
+
+/**
+ * The outer plan may have been de-duplicated and the function below 
updates the
+ * outer references to refer to the de-duplicated attributes.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT * FROM t1
+ *   WHERE EXISTS (SELECT 1
+ * FROM t2
+ * WHERE t1.c1 = t2.c1)
+ *   INTERSECT
+ *   SELECT * FROM t1
+ *   WHERE EXISTS (SELECT 1
+ * FROM t2
+ * WHERE t1.c1 = t2.c1)
+ * }}}
+  * Plan before resolveReference rule.
--- End diff --

So I am all for decent documentation, but do you think we can find a 
somewhat smaller example?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r102168200
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1110,31 +1184,24 @@ class Analyzer(
 }
 
 /**
- * Pull out all (outer) correlated predicates from a given subquery. 
This method removes the
- * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
- * to all intermediate [[Project]] and [[Aggregate]] clauses (if they 
are missing) in order to
- * be able to evaluate the predicates at the top level.
- *
- * This method returns the rewritten subquery and correlated 
predicates.
+ * Validates to make sure the outer references appearing inside the 
subquery
+ * are legal. This function also returns the list of expressions
+ * that contain outer references. These outer references would be kept 
as children
+ * of subquery expressions by the caller of this function.
  */
-private def pullOutCorrelatedPredicates(sub: LogicalPlan): 
(LogicalPlan, Seq[Expression]) = {
-  val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+private def checkAndGetOuterReferences(sub: LogicalPlan): 
Seq[Expression] = {
+  val outerReferences = 
scala.collection.mutable.ArrayBuffer.empty[Seq[Expression]]
 
   // Make sure a plan's subtree does not contain outer references
   def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
-if (p.collectFirst(predicateMap).nonEmpty) {
+if (p.find(SubExprUtils.getOuterReferences(_).nonEmpty).nonEmpty) {
--- End diff --

This might be somewhat expensive. We could also use something that 
terminates early.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17078: [SPARK-19746][ML] Faster indexing for logistic ag...

2017-02-27 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/17078#discussion_r103343872
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 ---
@@ -1431,7 +1431,12 @@ private class LogisticAggregator(
   private var weightSum = 0.0
   private var lossSum = 0.0
 
-  private val gradientSumArray = Array.fill[Double](coefficientSize)(0.0D)
+  @transient private lazy val coefficientsArray = bcCoefficients.value 
match {
--- End diff --

Yeah, I'll update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16965: [SPARK-18450][ML] Scala API Change for LSH AND-amplifica...

2017-02-27 Thread Yunni
Github user Yunni commented on the issue:

https://github.com/apache/spark/pull/16965
  
Looks like the rebase is making it even worse. I will reopen a PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16965: [SPARK-18450][ML] Scala API Change for LSH AND-amplifica...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16965
  
**[Test build #73538 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73538/testReport)**
 for PR 16965 at commit 
[`0b46461`](https://github.com/apache/spark/commit/0b4646199cf061d1f358a78122ef8bdf164ac839).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17078: [SPARK-19746][ML] Faster indexing for logistic aggregato...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17078
  
**[Test build #73537 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73537/testReport)**
 for PR 17078 at commit 
[`44ee113`](https://github.com/apache/spark/commit/44ee1137efc5e23ffc6a5bfb8dec54d95e7a72e2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16965: [SPARK-18450][ML] Scala API Change for LSH AND-am...

2017-02-27 Thread Yunni
Github user Yunni closed the pull request at:

https://github.com/apache/spark/pull/16965


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14273: [SPARK-9140] [ML] Replace TimeTracker by MultiStopwatch

2017-02-27 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/14273
  
OK apologies @MechCoder for the delay.  I guess we can close this issue, 
and someone else can open up a PR based on yours.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14273: [SPARK-9140] [ML] Replace TimeTracker by MultiStopwatch

2017-02-27 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/14273
  
Sorry about the delay here.  Do you still have time to work on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17015: [SPARK-19678][SQL] remove MetastoreRelation

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17015
  
**[Test build #73536 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73536/testReport)**
 for PR 17015 at commit 
[`d10bfbc`](https://github.com/apache/spark/commit/d10bfbc71ac30b74222f6794edb5c62ad562f3e5).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17078: [SPARK-19746][ML] Faster indexing for logistic ag...

2017-02-27 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/17078#discussion_r103342591
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
 ---
@@ -456,6 +456,32 @@ class LogisticRegressionSuite
 assert(blrModel.intercept !== 0.0)
   }
 
+  test("sparse coefficients in LogisticAggregator") {
+val bcCoefficientsBinary = 
spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0)))
+val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0))
+val binaryAgg = new LogisticAggregator(bcCoefficientsBinary, 
bcFeaturesStd, 2,
+  fitIntercept = true, multinomial = false)
+val thrownBinary = withClue("binary logistic aggregator cannot handle 
sparse coefficients") {
--- End diff --

I think we should handle sparse coefficients for further performance 
improvement. But not in this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17078: [SPARK-19746][ML] Faster indexing for logistic ag...

2017-02-27 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/17078#discussion_r103342093
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 ---
@@ -1447,7 +1447,7 @@ private class LogisticAggregator(
   label: Double): Unit = {
 
 val localFeaturesStd = bcFeaturesStd.value
-val localCoefficients = bcCoefficients.value
+val localCoefficients = bcCoefficients.value.toArray
--- End diff --

My concern is that if `coefficients` is sparse, we are not just doing the 
pointer indirection but creating a new dense array from sparse matrix. I know 
we always pass in a dense matrix so this will not be an issue now, but being 
said that, in the following code, if we call `compress` in the `coefficients`, 
we may be able to broadcast a smaller object when L1 is applied or in the 
initial iteration that most of the elements in `coefficients` are zero.


https://github.com/sethah/spark/blob/3bea389f6780e1fd0385fbe26954fa4f59b69e37/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala#L1674


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17079: [SPARK-19748][SQL]refresh function has a wrong order to ...

2017-02-27 Thread windpiger
Github user windpiger commented on the issue:

https://github.com/apache/spark/pull/17079
  
cc @cloud-fan @gatorsmile


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16959: [SPARK-19631][CORE] OutputCommitCoordinator should not a...

2017-02-27 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/16959
  
Looks ok to me, but let me ping some others @squito @kayousterhout 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17081: [SPARK-18726][SQL][FOLLOW-UP]resolveRelation for FileFor...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17081
  
**[Test build #73539 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73539/testReport)**
 for PR 17081 at commit 
[`f1da0a4`](https://github.com/apache/spark/commit/f1da0a4cf457f4efb6128beca3c08ccf95ef37a0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14273: [SPARK-9140] [ML] Replace TimeTracker by MultiStopwatch

2017-02-27 Thread sethah
Github user sethah commented on the issue:

https://github.com/apache/spark/pull/14273
  
@jkbradley I do not think @MechCoder is working on Spark for the time being.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103346762
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1110,31 +1184,24 @@ class Analyzer(
 }
 
 /**
- * Pull out all (outer) correlated predicates from a given subquery. 
This method removes the
- * correlated predicates from subquery [[Filter]]s and adds the 
references of these predicates
- * to all intermediate [[Project]] and [[Aggregate]] clauses (if they 
are missing) in order to
- * be able to evaluate the predicates at the top level.
- *
- * This method returns the rewritten subquery and correlated 
predicates.
+ * Validates to make sure the outer references appearing inside the 
subquery
+ * are legal. This function also returns the list of expressions
+ * that contain outer references. These outer references would be kept 
as children
+ * of subquery expressions by the caller of this function.
  */
-private def pullOutCorrelatedPredicates(sub: LogicalPlan): 
(LogicalPlan, Seq[Expression]) = {
-  val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, 
Seq[Expression]]
+private def checkAndGetOuterReferences(sub: LogicalPlan): 
Seq[Expression] = {
--- End diff --

@hvanhovell The code here validates the correlated references as well as 
collects them to rewrite the outer plan to record the outer references. You are 
suggesting to move the "check" portion to checkAnalysis ? I will change to use 
foreachUp. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17012: [SPARK-19677][SS] Renaming a file atop an existing one s...

2017-02-27 Thread hejix
Github user hejix commented on the issue:

https://github.com/apache/spark/pull/17012
  
Just some feedback that I did some initial regression testing with this 
pull request on a full YARN (v2.7.3) 4 node cluster on GCP and it appears to 
have fixed the two issues we had- our structured streaming drivers now restart 
normally and our complex aggregation driver runs for the first time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16954: [SPARK-18874][SQL] First phase: Deferring the cor...

2017-02-27 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/16954#discussion_r103347540
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2332,6 +2337,11 @@ class Analyzer(
 override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveExpressions {
   case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>
 e.withTimeZone(conf.sessionLocalTimeZone)
+  // Casts could be added in the subquery plan through the rule 
TypeCoercion while coercing
+  // the types between the value expression and list query expression 
of IN expression.
+  // We need to subject the subquery plan through ResolveTimeZone 
again to setup timezone
+  // information for time zone aware expressions.
+  case e: ListQuery => e.withNewPlan(ResolveTimeZone.apply(e.plan))
--- End diff --

@hvanhovell Thank you.. I will change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17075: [SPARK-19727][SQL] Fix for round function that modifies ...

2017-02-27 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/17075
  
I dont' know the code well enough to really evaluate this, but, I see that 
`.clone()` is called in a similar context in `decimalExpressions`. There are 
also similar usages of `changePrecision` in `UnsafeArrayWriter` and 
`UnsafeRowWriter`; I wonder if they are affected too?

CC maybe @cloud-fan or @yjshen  ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17082: [SPARK-19749][SS] Name socket source with a meaningful n...

2017-02-27 Thread uncleGen
Github user uncleGen commented on the issue:

https://github.com/apache/spark/pull/17082
  
@srowen  I think this is the only one souce forgotten to name. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17071: [SPARK-15615][SQL][BUILD][FOLLOW-UP] Replace deprecated ...

2017-02-27 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/17071
  
I like it, though, regarding still testing the deprecated method -- maybe 
it's best to even have a test that is explicitly just for testing the old 
method? that may be clearer than just picking some test from among another 
batch to leave with the old behavior. It might mean actually adding one new 
small test case in the generic JSON test suite for this purpose. What do you 
think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-02-27 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103174824
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -23,7 +23,10 @@ import java.util.Properties
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, Map}
+import scala.util.control.NonFatal
 
+import org.apache.spark.internal.Logging
+import org.apache.spark.TaskNotSerializableException
--- End diff --

I'm sorry, I forgot that. It's done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17059: [SPARK-19733][ML]Removed unnecessary castings and refact...

2017-02-27 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/17059
  
That's compelling regarding performance. It's not big but not trivial.
My remaining concern is whether you're handling all the cases the original 
did. `Number` covers a lot but does it include all the SQL decimal types? I 
think the common casting mechanism would cover those. Granted, it's odd if 
someone tries to use those values, but ideally would not work differently after 
this change.

I am still not sure about the `Long` case -- don't you want to handle Scala 
Long like Int here for efficiency?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17083: [SPARK-19750][UI][branch-2.1] Fix redirect issue ...

2017-02-27 Thread jerryshao
GitHub user jerryshao opened a pull request:

https://github.com/apache/spark/pull/17083

[SPARK-19750][UI][branch-2.1] Fix redirect issue from http to https

## What changes were proposed in this pull request?

If spark ui port (4040) is not set, it will choose port number 0, and this 
will make https port to also choose 0. And in the current Spark code, it will 
use this https port (0) to do redirect, so when redirect triggered, it will 
point to a wrong url:

like:

```
/tmp/temp$ wget http://172.27.25.134:55015
--2017-02-23 12:13:54--  http://172.27.25.134:55015/
Connecting to 172.27.25.134:55015... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://172.27.25.134:0/ [following]
--2017-02-23 12:13:54--  https://172.27.25.134:0/
Connecting to 172.27.25.134:0... failed: Can't assign requested address.
Retrying.

--2017-02-23 12:13:55--  (try: 2)  https://172.27.25.134:0/
Connecting to 172.27.25.134:0... failed: Can't assign requested address.
Retrying.

--2017-02-23 12:13:57--  (try: 3)  https://172.27.25.134:0/
Connecting to 172.27.25.134:0... failed: Can't assign requested address.
Retrying.

--2017-02-23 12:14:00--  (try: 4)  https://172.27.25.134:0/
Connecting to 172.27.25.134:0... failed: Can't assign requested address.
Retrying.

```

So instead of using 0 to do redirect, we should pick a bound port instead. 

This issue only exists in Spark 2.1-, and can be reproduced in yarn cluster 
mode.

## How was this patch tested?

Current redirect UT doesn't verify this issue, so extend current UT to do 
correct verification.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jerryshao/apache-spark SPARK-19750

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17083.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17083


commit 5408005912c1e369cbf3d77ea490b88f621ee047
Author: jerryshao 
Date:   2017-02-27T10:36:45Z

Fix redirect issue for https

Change-Id: I5306a7553c230811dcada4d9c205b82b2af77c6e




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15770: [SPARK-15784][ML]:Add Power Iteration Clustering to spar...

2017-02-27 Thread thunterdb
Github user thunterdb commented on the issue:

https://github.com/apache/spark/pull/15770
  
Note that any of these formats would cause trouble for a graph with high 
centrality (lady gaga in the twitter graph). That being said, I do not have a 
strong opinion as to which option we pick, in order to move things along.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17039: [SPARK-19710][SQL][TESTS] Fix ordering of rows in query ...

2017-02-27 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/17039
  
@hvanhovell Is that possible the SQL queries are used to verify the 
behavior of ORDER BY? Do you think we should explicitly leave a comment to say 
SQLQueryTestSuite will not be used for this goal?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103348444
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
   (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
 }
 
-cachedData.foreach {
-  case data if data.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined =>
-val dataIndex = cachedData.indexWhere(cd => 
data.plan.sameResult(cd.plan))
-if (dataIndex >= 0) {
-  data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking 
= true)
-  cachedData.remove(dataIndex)
-}
-
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, 
data.plan))
-  case _ => // Do Nothing
+cachedData.filter {
--- End diff --

This kind of collection can't be modified during iterating. Some elements 
are not iterated over if we delete/add elements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread sueann
Github user sueann commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103349080
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.recommendation
+
+import scala.language.implicitConversions
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.sql.{Encoder, Encoders}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.util.BoundedPriorityQueue
+
+/**
+ * Works on rows of the form (K1, K2, V) where K1 & K2 are IDs and V is 
the score value. Finds
+ * the top `num` K2 items based on the given Ordering.
+ */
+
+private[recommendation] class TopByKeyAggregator[K1: TypeTag, K2: TypeTag, 
V: TypeTag]
--- End diff --

we may want to put this somewhere more general to be used ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17089: [SPARK-19756][SQL] drop the table cache after ins...

2017-02-27 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/17089

[SPARK-19756][SQL] drop the table cache after inserting into a data source 
table

## What changes were proposed in this pull request?

When we inserting into a table, we should uncache it to avoid exposing 
stale data. This is the existing behavior for hive tables, see 
`InsertIntoHiveTable`, this PR fixes this problem for data source tables.

## How was this patch tested?

new regression test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark minor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17089.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17089






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread sueann
Github user sueann commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103349139
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.recommendation
+
+import scala.language.implicitConversions
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.sql.{Encoder, Encoders}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.util.BoundedPriorityQueue
+
+/**
+ * Works on rows of the form (K1, K2, V) where K1 & K2 are IDs and V is 
the score value. Finds
+ * the top `num` K2 items based on the given Ordering.
+ */
+
+private[recommendation] class TopByKeyAggregator[K1: TypeTag, K2: TypeTag, 
V: TypeTag]
--- End diff --

(It'd need its own unit tests, though not sure if we'll get everything in 
for 2.2)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread sueann
GitHub user sueann opened a pull request:

https://github.com/apache/spark/pull/17090

[Spark-19535][ML] RecommendForAllUsers RecommendForAllItems for ALS on 
Dataframe 

## What changes were proposed in this pull request?

This is a simple implementation of RecommendForAllUsers & 
RecommendForAllItems for the Dataframe version of ALS. It uses Dataframe 
operations (not a wrapper on the RDD implementation). Haven't benchmarked 
against a wrapper, but unit test examples do work.

## How was this patch tested?

Unit tests
```
$ build/sbt
> mllib/testOnly *ALSSuite -- -z "recommendFor"
> mllib/testOnly
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sueann/spark SPARK-19535

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17090.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17090






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17089: [SPARK-19756][SQL] drop the table cache after inserting ...

2017-02-27 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/17089
  
cc @gatorsmile 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17075: [SPARK-19727][SQL] Fix for round function that modifies ...

2017-02-27 Thread wojtek-szymanski
Github user wojtek-szymanski commented on the issue:

https://github.com/apache/spark/pull/17075
  
I have just started refactoring of `changePrecission` in order to make it 
immutable. 
My idea was to change the signature from:
 `def changePrecision(precision: Int, scale: Int, mode: Int): Boolean`
into 
 `def changePrecision(precision: Int, scale: Int, mode: Int): 
Option[Decimal]`

 Here are my first thoughts:
- `org.apache.spark.sql.types.Decimal` is mutable by definition, so making 
one method immutable makes its contract very inconsistent

- I am afraid of performance degradation in micro-benchmarks since in some 
use cases, an instance needs to be created twice

- `changePrecission` is called 10 times in Scala, 10 times in **code gen** 
functions and 3 times in Java **unsafe** writers (`UnsafeArrayWriter`, 
`UnsafeRowWriter`)

I would be grateful if you could confirm if it's the right way to go.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103349345
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
   (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
 }
 
-cachedData.foreach {
-  case data if data.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined =>
-val dataIndex = cachedData.indexWhere(cd => 
data.plan.sameResult(cd.plan))
-if (dataIndex >= 0) {
-  data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking 
= true)
-  cachedData.remove(dataIndex)
-}
-
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, 
data.plan))
-  case _ => // Do Nothing
+cachedData.filter {
--- End diff --

but we are still modifying it during iteration, after the `filter`. can you 
be more specific about what the problem is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103349429
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.recommendation
+
+import scala.language.implicitConversions
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.sql.{Encoder, Encoders}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.util.BoundedPriorityQueue
+
+/**
+ * Works on rows of the form (K1, K2, V) where K1 & K2 are IDs and V is 
the score value. Finds
+ * the top `num` K2 items based on the given Ordering.
+ */
+
+private[recommendation] class TopByKeyAggregator[K1: TypeTag, K2: TypeTag, 
V: TypeTag]
+  (num: Int, ord: Ordering[(K2, V)])
+  extends Aggregator[(K1, K2, V), BoundedPriorityQueue[(K2, V)], 
Array[(K2, V)]] {
+
+  override def zero: BoundedPriorityQueue[(K2, V)] = new 
BoundedPriorityQueue[(K2, V)](num)(ord)
+  override def reduce(
--- End diff --

I think you need to throw some spaces and braces in here to make it a bit 
more readable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16959: [SPARK-19631][CORE] OutputCommitCoordinator shoul...

2017-02-27 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16959#discussion_r103348105
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -48,25 +48,29 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
   private type StageId = Int
   private type PartitionId = Int
   private type TaskAttemptNumber = Int
+  private case class StageState(
+  authorizedCommitters: Array[TaskAttemptNumber],
+  failures: mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]])
--- End diff --

Why not define failures as an member variable (and initialize it there with 
an empty map), rather than forcing the caller to pass in an empty map?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17079: [SPARK-19748][SQL]refresh function has a wrong or...

2017-02-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17079#discussion_r103349639
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 ---
@@ -178,6 +178,34 @@ class FileIndexSuite extends SharedSQLContext {
   assert(catalog2.allFiles().nonEmpty)
 }
   }
+
+  test("refresh for InMemoryFileIndex with FileStatusCache") {
+withTempDir { dir =>
+  val fileStatusCache = FileStatusCache.getOrCreate(spark)
+  val dirPath = new Path(dir.getAbsolutePath)
+  val catalog = new InMemoryFileIndex(spark, Seq(dirPath), Map.empty,
+None, fileStatusCache) {
--- End diff --

nit:
```
val catalog =
  new XXX(...) {
def xxx
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103349590
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
---
@@ -248,18 +248,18 @@ class ALSModel private[ml] (
   @Since("1.3.0")
   def setPredictionCol(value: String): this.type = set(predictionCol, 
value)
 
+  private val predict = udf { (userFeatures: Seq[Float], itemFeatures: 
Seq[Float]) =>
+if (userFeatures != null && itemFeatures != null) {
+  blas.sdot(rank, userFeatures.toArray, 1, itemFeatures.toArray, 1)
--- End diff --

I wonder how the overhead of converting to an array compares with the 
efficiency of calling sdot -- could be faster to just do the Seqs by hand? is 
it possible to operate on something besides Seq?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17090: [Spark-19535][ML] RecommendForAllUsers RecommendForAllIt...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17090
  
**[Test build #73540 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73540/testReport)**
 for PR 17090 at commit 
[`707bc6b`](https://github.com/apache/spark/commit/707bc6b153a7f899fbf3fe2a5675cacba1f95711).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17089: [SPARK-19756][SQL] drop the table cache after inserting ...

2017-02-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17089
  
**[Test build #73541 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73541/testReport)**
 for PR 17089 at commit 
[`8bca8d3`](https://github.com/apache/spark/commit/8bca8d35e04e582f73052411e42811a8c90329de).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17079: [SPARK-19748][SQL]refresh function has a wrong or...

2017-02-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17079#discussion_r103350023
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 ---
@@ -178,6 +178,34 @@ class FileIndexSuite extends SharedSQLContext {
   assert(catalog2.allFiles().nonEmpty)
 }
   }
+
+  test("refresh for InMemoryFileIndex with FileStatusCache") {
+withTempDir { dir =>
+  val fileStatusCache = FileStatusCache.getOrCreate(spark)
+  val dirPath = new Path(dir.getAbsolutePath)
+  val catalog = new InMemoryFileIndex(spark, Seq(dirPath), Map.empty,
+None, fileStatusCache) {
+def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
+def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
+  }
+
+  assert(catalog.leafDirPaths.isEmpty)
+  assert(catalog.leafFilePaths.isEmpty)
+
+  val file = new File(dir, "text.txt")
+  stringToFile(file, "text")
+
+  catalog.refresh()
+
+  assert(catalog.leafFilePaths.size == 1)
+  assert(catalog.leafFilePaths.head.toString.stripSuffix("/") ==
+s"file:${file.getAbsolutePath.stripSuffix("/")}")
--- End diff --

this looks hacky, can you turn them into `Path` and compare?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17079: [SPARK-19748][SQL]refresh function has a wrong order to ...

2017-02-27 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/17079
  
good catch! Can you show a real example that fails because of this bug? I'm 
wondering why the existing unit tests didn't expose this bug...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17078: [SPARK-19746][ML] Faster indexing for logistic ag...

2017-02-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17078


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17091: [SPARK-19757][CORE] Executor with task scheduled ...

2017-02-27 Thread jxiang
GitHub user jxiang opened a pull request:

https://github.com/apache/spark/pull/17091

[SPARK-19757][CORE] Executor with task scheduled could be killed due to 
idleness

## What changes were proposed in this pull request?
In makeOffers, put in one synchronization block to check if
an executor is alive, and schedule a task to it. So that the
executor won't be killed in the middle

(Please fill in changes proposed in this fix)

## How was this patch tested?
manual tests
(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jxiang/spark spark-19757

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17091.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17091


commit 6b57d7b0d7ffb511e0348ef3bdcf6f1061225984
Author: Jimmy Xiang 
Date:   2017-02-28T00:27:45Z

[CORE] Executor with task scheduled could be killed due to idleness




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread sueann
Github user sueann commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103350410
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.recommendation
+
+import scala.language.implicitConversions
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.sql.{Encoder, Encoders}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.util.BoundedPriorityQueue
+
+/**
+ * Works on rows of the form (K1, K2, V) where K1 & K2 are IDs and V is 
the score value. Finds
+ * the top `num` K2 items based on the given Ordering.
+ */
+
+private[recommendation] class TopByKeyAggregator[K1: TypeTag, K2: TypeTag, 
V: TypeTag]
+  (num: Int, ord: Ordering[(K2, V)])
+  extends Aggregator[(K1, K2, V), BoundedPriorityQueue[(K2, V)], 
Array[(K2, V)]] {
+
+  override def zero: BoundedPriorityQueue[(K2, V)] = new 
BoundedPriorityQueue[(K2, V)](num)(ord)
+  override def reduce(
--- End diff --

👍


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17078: [SPARK-19746][ML] Faster indexing for logistic aggregato...

2017-02-27 Thread dbtsai
Github user dbtsai commented on the issue:

https://github.com/apache/spark/pull/17078
  
Thanks. Merged into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17090: [Spark-19535][ML] RecommendForAllUsers RecommendF...

2017-02-27 Thread sueann
Github user sueann commented on a diff in the pull request:

https://github.com/apache/spark/pull/17090#discussion_r103350775
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
---
@@ -248,18 +248,18 @@ class ALSModel private[ml] (
   @Since("1.3.0")
   def setPredictionCol(value: String): this.type = set(predictionCol, 
value)
 
+  private val predict = udf { (userFeatures: Seq[Float], itemFeatures: 
Seq[Float]) =>
+if (userFeatures != null && itemFeatures != null) {
+  blas.sdot(rank, userFeatures.toArray, 1, itemFeatures.toArray, 1)
--- End diff --

Good point! But since I copy-pasted this block in this PR, maybe it's okay 
to try it out in another PR? At least with what we have here we know it's not a 
regression. Want to make sure we get some version of ALS recommendForAll* in 
2.2. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103351494
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
   (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
 }
 
-cachedData.foreach {
-  case data if data.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined =>
-val dataIndex = cachedData.indexWhere(cd => 
data.plan.sameResult(cd.plan))
-if (dataIndex >= 0) {
-  data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking 
= true)
-  cachedData.remove(dataIndex)
-}
-
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, 
data.plan))
-  case _ => // Do Nothing
+cachedData.filter {
--- End diff --

can we use a java collection so that we can remove elements while iterating?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103307491
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -146,4 +107,153 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
 conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
 
+  /**
+   * Get an identical copy of the `HiveSessionState`.
+   * This should ideally reuse the `SessionState.clone` but cannot do so.
+   * Doing that will throw an exception when trying to clone the catalog.
+   */
+  override def clone(newSparkSession: SparkSession): HiveSessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val experimentalMethodsCopy = experimentalMethods.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  newSparkSession,
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+val hiveClient =
+  
newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new HiveSessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethodsCopy,
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  hiveClient,
+  HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator,
+  HiveSessionState.createPlannerCreator(
+newSparkSession,
+confCopy,
+experimentalMethodsCopy))
+  }
+
+}
+
+object HiveSessionState {
+
+  def apply(sparkSession: SparkSession): HiveSessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): HiveSessionState = {
+
+val initHelper = SessionState(sparkSession, conf)
+
+val sparkContext = sparkSession.sparkContext
+
+val catalog = HiveSessionCatalog(
+  sparkSession,
+  SessionState.createFunctionResourceLoader(sparkContext, 
sparkSession.sharedState),
+  initHelper.functionRegistry,
+  initHelper.conf,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
initHelper.conf),
+  initHelper.sqlParser)
+
+// A Hive client used for interacting with the metastore.
+val metadataHive: HiveClient =
+  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+// An analyzer that uses the Hive metastore.
+val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, 
initHelper.conf)
+
+val plannerCreator = createPlannerCreator(
+  sparkSession,
+  initHelper.conf,
+  initHelper.experimentalMethods)
+
+new HiveSessionState(
+  sparkContext,
+  sparkSession.sharedState,
+  initHelper.conf,
+  initHelper.experimentalMethods,
+  initHelper.functionRegistry,
+  catalog,
+  initHelper.sqlParser,
+  metadataHive,
+  analyzer,
+  initHelper.streamingQueryManager,
+  initHelper.queryExecutionCreator,
+  plannerCreator)
+  }
+
+  def createAnalyzer(
+  sparkSession: SparkSession,
+  catalog: HiveSessionCatalog,
+  sqlConf: SQLConf): Analyzer = {
+
+new Analyzer(catalog, sqlConf) {
+  override val extendedResolutionRules =
+new ResolveHiveSerdeTable(sparkSession) ::
+new FindDataSourceTable(sparkSession) ::
+new FindHiveSerdeTable(sparkSession) ::
+new ResolveSQLOnFile(sparkSession) :: Nil
+
+  override val postHocResolutionRules =
+catalog.ParquetConversions ::
+catalog.OrcConversions ::
+PreprocessTableCreation(sparkSession) ::
+PreprocessTableInsertion(sqlConf) ::
+DataSourceAnalysis(sqlConf) ::
+HiveAnalysis :: Nil
+
+  override val extendedCheckRules = Seq(PreWriteCheck)
+}
+  }
+
+  def createPlannerCreator(
+  associatedSparkSession: SparkSession,
+  sqlConf: SQLConf,
+  experimentalMethods: ExperimentalMethods): () => SparkPlanner = {
+
--- End diff 

[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103338672
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+class SessionStateSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+  protected var activeSession: SparkSession = _
+
+  protected def createSession(): Unit = {
+activeSession = SparkSession.builder().master("local").getOrCreate()
+  }
+
+  override def beforeEach(): Unit = {
+createSession()
+  }
+
+  override def afterEach(): Unit = {
+activeSession.stop()
+  }
+
+  test("fork new session and inherit RuntimeConfig options") {
+val key = "spark-config-clone"
+activeSession.conf.set(key, "active")
+
+// inheritance
+val forkedSession = activeSession.cloneSession()
+assert(forkedSession ne activeSession)
+assert(forkedSession.conf ne activeSession.conf)
+assert(forkedSession.conf.get(key) == "active")
+
+// independence
+forkedSession.conf.set(key, "forked")
+assert(activeSession.conf.get(key) == "active")
+activeSession.conf.set(key, "dontcopyme")
+assert(forkedSession.conf.get(key) == "forked")
+  }
+
+  test("fork new session and inherit function registry and udf") {
+activeSession.udf.register("strlenScala", (_: String).length + (_: 
Int))
+val forkedSession = activeSession.cloneSession()
+
+// inheritance
+assert(forkedSession ne activeSession)
+assert(forkedSession.sessionState.functionRegistry ne
+  activeSession.sessionState.functionRegistry)
+
assert(forkedSession.sessionState.functionRegistry.lookupFunction("strlenScala").nonEmpty)
+
+// independence
+forkedSession.sessionState.functionRegistry.dropFunction("strlenScala")
+
assert(activeSession.sessionState.functionRegistry.lookupFunction("strlenScala").nonEmpty)
+activeSession.udf.register("addone", (_: Int) + 1)
+
assert(forkedSession.sessionState.functionRegistry.lookupFunction("addone").isEmpty)
+  }
+
+  test("fork new session and inherit experimental methods") {
+object DummyRule1 extends Rule[LogicalPlan] {
+  def apply(p: LogicalPlan): LogicalPlan = p
+}
+object DummyRule2 extends Rule[LogicalPlan] {
+  def apply(p: LogicalPlan): LogicalPlan = p
+}
+val optimizations = List(DummyRule1, DummyRule2)
+
+activeSession.experimental.extraOptimizations = optimizations
+
+val forkedSession = activeSession.cloneSession()
+
+// inheritance
+assert(forkedSession ne activeSession)
+assert(forkedSession.experimental ne activeSession.experimental)
+assert(forkedSession.experimental.extraOptimizations.toSet ==
+  activeSession.experimental.extraOptimizations.toSet)
+
+// independence
+forkedSession.experimental.extraOptimizations = List(DummyRule2)
+assert(activeSession.experimental.extraOptimizations == optimizations)
+activeSession.experimental.extraOptimizations = List(DummyRule1)
+assert(forkedSession.experimental.extraOptimizations == 
List(DummyRule2))
+  }
+
+  test("fork new sessions and run query on inherited table") {
+def checkTableExists(sparkSession: SparkSession): Unit = {
+  QueryTest.checkAnswer(sparkSession.sql(
+"""
+  |SELECT x.str, COUNT(*)
+  |FROM df x JOIN df y ON x.str = y.str
+  |GROUP BY x.str
+""".stripMargin),
+Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
+}
+

[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103305709
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -90,110 +208,29 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 }
   }
 
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  lazy val catalog = new SessionCatalog(
-sparkSession.sharedState.externalCatalog,
-sparkSession.sharedState.globalTempViewManager,
-functionResourceLoader,
-functionRegistry,
-conf,
-newHadoopConf(),
-sqlParser)
+  def newHadoopConf(copyHadoopConf: Configuration, sqlConf: SQLConf): 
Configuration = {
+val hadoopConf = new Configuration(copyHadoopConf)
+sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) 
hadoopConf.set(k, v) }
+hadoopConf
+  }
 
-  /**
-   * Interface exposed to the user for registering user-defined functions.
-   * Note that the user-defined functions must be deterministic.
-   */
-  lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry)
+  def createAnalyzer(
+  sparkSession: SparkSession,
+  catalog: SessionCatalog,
+  sqlConf: SQLConf): Analyzer = {
 
-  /**
-   * Logical query plan analyzer for resolving unresolved attributes and 
relations.
-   */
-  lazy val analyzer: Analyzer = {
-new Analyzer(catalog, conf) {
+new Analyzer(catalog, sqlConf) {
   override val extendedResolutionRules =
 new FindDataSourceTable(sparkSession) ::
 new ResolveSQLOnFile(sparkSession) :: Nil
 
   override val postHocResolutionRules =
 PreprocessTableCreation(sparkSession) ::
-PreprocessTableInsertion(conf) ::
-DataSourceAnalysis(conf) :: Nil
+PreprocessTableInsertion(sqlConf) ::
+DataSourceAnalysis(sqlConf) :: Nil
 
   override val extendedCheckRules = Seq(PreWriteCheck, HiveOnlyCheck)
 }
   }
 
-  /**
-   * Logical query plan optimizer.
-   */
-  lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, 
experimentalMethods)
-
-  /**
-   * Parser that extracts expressions, plans, table identifiers etc. from 
SQL texts.
-   */
-  lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
-
-  /**
-   * Planner that converts optimized logical plans to physical plans.
-   */
-  def planner: SparkPlanner =
-new SparkPlanner(sparkSession.sparkContext, conf, 
experimentalMethods.extraStrategies)
-
-  /**
-   * An interface to register custom 
[[org.apache.spark.sql.util.QueryExecutionListener]]s
-   * that listen for execution metrics.
-   */
-  lazy val listenerManager: ExecutionListenerManager = new 
ExecutionListenerManager
-
-  /**
-   * Interface to start and stop [[StreamingQuery]]s.
-   */
-  lazy val streamingQueryManager: StreamingQueryManager = {
-new StreamingQueryManager(sparkSession)
-  }
-
-  private val jarClassLoader: NonClosableMutableURLClassLoader =
-sparkSession.sharedState.jarClassLoader
-
-  // Automatically extract all entries and put it in our SQLConf
-  // We need to call it after all of vals have been initialized.
-  sparkSession.sparkContext.getConf.getAll.foreach { case (k, v) =>
-conf.setConfString(k, v)
-  }
-
-  // --
-  //  Helper methods, partially leftover from pre-2.0 days
-  // --
-
-  def executePlan(plan: LogicalPlan): QueryExecution = new 
QueryExecution(sparkSession, plan)
-
-  def refreshTable(tableName: String): Unit = {
-catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
-  }
-
-  def addJar(path: String): Unit = {
-sparkSession.sparkContext.addJar(path)
-
-val uri = new Path(path).toUri
-val jarURL = if (uri.getScheme == null) {
-  // `path` is a local file path without a URL scheme
-  new File(path).toURI.toURL
-} else {
-  // `path` is a URL with a scheme
-  uri.toURL
-}
-jarClassLoader.addURL(jarURL)
-Thread.currentThread().setContextClassLoader(jarClassLoader)
-  }
-
-  /**
-   * Analyzes the given table in the current database to generate 
statistics, which will be
-   * used in query optimizations.
-   */
-  def analyze(tableIdent: TableIdentifier, noscan: Boolean = true): Unit = 
{
--- End diff --

Cool.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project 

[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r10330
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+class SessionStateSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+  protected var activeSession: SparkSession = _
+
+  protected def createSession(): Unit = {
+activeSession = SparkSession.builder().master("local").getOrCreate()
+  }
+
+  override def beforeEach(): Unit = {
+createSession()
+  }
+
+  override def afterEach(): Unit = {
+activeSession.stop()
+  }
+
+  test("fork new session and inherit RuntimeConfig options") {
+val key = "spark-config-clone"
+activeSession.conf.set(key, "active")
+
+// inheritance
+val forkedSession = activeSession.cloneSession()
+assert(forkedSession ne activeSession)
+assert(forkedSession.conf ne activeSession.conf)
+assert(forkedSession.conf.get(key) == "active")
+
+// independence
+forkedSession.conf.set(key, "forked")
+assert(activeSession.conf.get(key) == "active")
+activeSession.conf.set(key, "dontcopyme")
+assert(forkedSession.conf.get(key) == "forked")
+  }
+
+  test("fork new session and inherit function registry and udf") {
+activeSession.udf.register("strlenScala", (_: String).length + (_: 
Int))
+val forkedSession = activeSession.cloneSession()
+
+// inheritance
+assert(forkedSession ne activeSession)
+assert(forkedSession.sessionState.functionRegistry ne
+  activeSession.sessionState.functionRegistry)
+
assert(forkedSession.sessionState.functionRegistry.lookupFunction("strlenScala").nonEmpty)
+
+// independence
+forkedSession.sessionState.functionRegistry.dropFunction("strlenScala")
+
assert(activeSession.sessionState.functionRegistry.lookupFunction("strlenScala").nonEmpty)
+activeSession.udf.register("addone", (_: Int) + 1)
+
assert(forkedSession.sessionState.functionRegistry.lookupFunction("addone").isEmpty)
+  }
+
+  test("fork new session and inherit experimental methods") {
+object DummyRule1 extends Rule[LogicalPlan] {
+  def apply(p: LogicalPlan): LogicalPlan = p
+}
+object DummyRule2 extends Rule[LogicalPlan] {
+  def apply(p: LogicalPlan): LogicalPlan = p
+}
+val optimizations = List(DummyRule1, DummyRule2)
+
+activeSession.experimental.extraOptimizations = optimizations
+
--- End diff --

removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103337066
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
@@ -136,6 +139,26 @@ private[sql] class SharedState(val sparkContext: 
SparkContext) extends Logging {
 }
 SparkSession.sqlListener.get()
   }
+
+  /*
+   * This belongs here more than in `SessionState`. However, does not seem 
that it can be
--- End diff --

Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103302329
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -65,22 +82,118 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 hadoopConf
   }
 
-  lazy val experimentalMethods = new ExperimentalMethods
-
   /**
-   * Internal catalog for managing functions registered by the user.
+   * Get an identical copy of the `SessionState` and associate it with the 
given `SparkSession`
*/
-  lazy val functionRegistry: FunctionRegistry = 
FunctionRegistry.builtin.copy()
+  def clone(newSparkSession: SparkSession): SessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new SessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethods.clone(),
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator)
+  }
+
+  // --
+  //  Helper methods, partially leftover from pre-2.0 days
+  // --
+
+  def executePlan(plan: LogicalPlan): QueryExecution = 
queryExecutionCreator(plan)
+
+  def refreshTable(tableName: String): Unit = {
+catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
+  }
+
+  def addJar(path: String): Unit = sharedState.addJar(path)
+}
+
+
+object SessionState {
+
+  def apply(sparkSession: SparkSession): SessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): SessionState = {
+
+val sparkContext = sparkSession.sparkContext
+
+// SQL-specific key-value configurations.
+val sqlConf = conf.getOrElse(new SQLConf)
+
+// Automatically extract all entries and put them in our SQLConf
+mergeSparkConf(sqlConf, sparkContext.getConf)
+
+// Internal catalog for managing functions registered by the user.
+val functionRegistry = FunctionRegistry.builtin.clone()
+
+// A class for loading resources specified by a function.
+val functionResourceLoader: FunctionResourceLoader =
+  createFunctionResourceLoader(sparkContext, sparkSession.sharedState)
+
+// Parser that extracts expressions, plans, table identifiers etc. 
from SQL texts.
+val sqlParser: ParserInterface = new SparkSqlParser(sqlConf)
+
+// Internal catalog for managing table and database states.
+val catalog = new SessionCatalog(
+  sparkSession.sharedState.externalCatalog,
+  sparkSession.sharedState.globalTempViewManager,
+  functionResourceLoader,
+  functionRegistry,
+  sqlConf,
+  newHadoopConf(sparkContext.hadoopConfiguration, sqlConf),
+  sqlParser)
+
+// Logical query plan analyzer for resolving unresolved attributes and 
relations.
+val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, sqlConf)
+
+// Interface to start and stop [[StreamingQuery]]s.
+val streamingQueryManager: StreamingQueryManager = new 
StreamingQueryManager(sparkSession)
+
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(sparkSession, plan)
+
+new SessionState(
+  sparkContext,
+  sparkSession.sharedState,
+  sqlConf,
+  new ExperimentalMethods,
+  functionRegistry,
+  catalog,
+  sqlParser,
+  analyzer,
+  streamingQueryManager,
+  queryExecutionCreator)
+  }
+
+  def createFunctionResourceLoader(
--- End diff --

`createFunctionResourceLoader` is also used in `HiveSessionState.apply`, 
private would make it inaccessible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, 

[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103295639
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
@@ -1178,4 +1181,36 @@ class SessionCatalog(
 }
   }
 
+  /**
+   * Get an identical copy of the `SessionCatalog`.
+   * The temporary tables and function registry are retained.
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103307776
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -146,4 +107,153 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
 conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
 
+  /**
+   * Get an identical copy of the `HiveSessionState`.
+   * This should ideally reuse the `SessionState.clone` but cannot do so.
+   * Doing that will throw an exception when trying to clone the catalog.
+   */
+  override def clone(newSparkSession: SparkSession): HiveSessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val experimentalMethodsCopy = experimentalMethods.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  newSparkSession,
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+val hiveClient =
+  
newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new HiveSessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethodsCopy,
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  hiveClient,
+  HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator,
+  HiveSessionState.createPlannerCreator(
+newSparkSession,
+confCopy,
+experimentalMethodsCopy))
+  }
+
+}
+
+object HiveSessionState {
+
+  def apply(sparkSession: SparkSession): HiveSessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): HiveSessionState = {
+
+val initHelper = SessionState(sparkSession, conf)
+
+val sparkContext = sparkSession.sparkContext
+
+val catalog = HiveSessionCatalog(
+  sparkSession,
+  SessionState.createFunctionResourceLoader(sparkContext, 
sparkSession.sharedState),
+  initHelper.functionRegistry,
+  initHelper.conf,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
initHelper.conf),
+  initHelper.sqlParser)
+
+// A Hive client used for interacting with the metastore.
+val metadataHive: HiveClient =
+  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+// An analyzer that uses the Hive metastore.
+val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, 
initHelper.conf)
+
+val plannerCreator = createPlannerCreator(
+  sparkSession,
+  initHelper.conf,
+  initHelper.experimentalMethods)
+
+new HiveSessionState(
+  sparkContext,
+  sparkSession.sharedState,
+  initHelper.conf,
+  initHelper.experimentalMethods,
+  initHelper.functionRegistry,
+  catalog,
+  initHelper.sqlParser,
+  metadataHive,
+  analyzer,
+  initHelper.streamingQueryManager,
+  initHelper.queryExecutionCreator,
+  plannerCreator)
+  }
+
+  def createAnalyzer(
+  sparkSession: SparkSession,
+  catalog: HiveSessionCatalog,
+  sqlConf: SQLConf): Analyzer = {
+
+new Analyzer(catalog, sqlConf) {
+  override val extendedResolutionRules =
+new ResolveHiveSerdeTable(sparkSession) ::
+new FindDataSourceTable(sparkSession) ::
+new FindHiveSerdeTable(sparkSession) ::
+new ResolveSQLOnFile(sparkSession) :: Nil
+
+  override val postHocResolutionRules =
+catalog.ParquetConversions ::
+catalog.OrcConversions ::
+PreprocessTableCreation(sparkSession) ::
+PreprocessTableInsertion(sqlConf) ::
+DataSourceAnalysis(sqlConf) ::
+HiveAnalysis :: Nil
+
+  override val extendedCheckRules = Seq(PreWriteCheck)
+}
+  }
+
+  def createPlannerCreator(
+  associatedSparkSession: SparkSession,
+  sqlConf: SQLConf,
+  experimentalMethods: ExperimentalMethods): () => SparkPlanner = {
+
+() =>

[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103328320
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala 
---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+
+class HiveSessionStateSuite extends SessionStateSuite
+  with TestHiveSingleton with BeforeAndAfterEach {
+
+  override def beforeEach(): Unit = {
+createSession()
+  }
+
+  override def afterEach(): Unit = {}
+
+  override def createSession(): Unit = {
+activeSession = spark.newSession()
+  }
+
--- End diff --

removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103303272
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -90,110 +203,37 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 }
   }
 
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  lazy val catalog = new SessionCatalog(
-sparkSession.sharedState.externalCatalog,
-sparkSession.sharedState.globalTempViewManager,
-functionResourceLoader,
-functionRegistry,
-conf,
-newHadoopConf(),
-sqlParser)
+  def newHadoopConf(copyHadoopConf: Configuration, sqlConf: SQLConf): 
Configuration = {
--- End diff --

Changed name.
This is also used in `HiveSessionState.apply`, would be rendered 
inaccessible if `private`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103295692
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
@@ -1178,4 +1181,36 @@ class SessionCatalog(
 }
   }
 
+  /**
+   * Get an identical copy of the `SessionCatalog`.
+   * The temporary tables and function registry are retained.
+   * The table relation cache will not be populated.
+   * @note `externalCatalog` and `globalTempViewManager` are from shared 
state, don't need deep
+   * copy. `FunctionResourceLoader` is effectively stateless, also does 
not need deep copy.
+   * All arguments passed in should be associated with a particular 
`SparkSession`.
+   */
+  def clone(
+  conf: CatalystConf,
+  hadoopConf: Configuration,
+  functionRegistry: FunctionRegistry,
+  parser: ParserInterface): SessionCatalog = {
+
+val catalog = new SessionCatalog(
+  externalCatalog,
+  globalTempViewManager,
+  functionResourceLoader,
+  functionRegistry,
+  conf,
+  hadoopConf,
+  parser)
+
+synchronized {
+  catalog.currentDb = currentDb
+  // copy over temporary tables
+  tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2))
+}
+
+catalog
+  }
+
--- End diff --

Removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103306212
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
@@ -136,6 +139,26 @@ private[sql] class SharedState(val sparkContext: 
SparkContext) extends Logging {
 }
 SparkSession.sqlListener.get()
   }
+
+  /*
+   * This belongs here more than in `SessionState`. However, does not seem 
that it can be
+   * removed from `SessionState` and `HiveSessionState` without using 
reflection in
+   * `AddJarCommand`.
+   */
+  def addJar(path: String): Unit = {
+sparkContext.addJar(path)
+
--- End diff --

removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103307299
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala ---
@@ -212,3 +247,31 @@ private[sql] class HiveSessionCatalog(
 "histogram_numeric"
   )
 }
+
+private[sql] object HiveSessionCatalog {
+
+  def apply(
+  sparkSession: SparkSession,
+  functionResourceLoader: FunctionResourceLoader,
+  functionRegistry: FunctionRegistry,
+  conf: SQLConf,
+  hadoopConf: Configuration,
+  parser: ParserInterface): HiveSessionCatalog = {
+
+// Catalog for handling data source tables. TODO: This really doesn't 
belong here since it is
+// essentially a cache for metastore tables. However, it relies on a 
lot of session-specific
+// things so it would be a lot of work to split its functionality 
between HiveSessionCatalog
+// and HiveCatalog. We should still do it at some point...
+val metastoreCatalog = new HiveMetastoreCatalog(sparkSession)
+
+new HiveSessionCatalog(
+  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
+  sparkSession.sharedState.globalTempViewManager,
+  metastoreCatalog,
+  functionResourceLoader: FunctionResourceLoader,
--- End diff --

Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103331408
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -17,89 +17,50 @@
 
 package org.apache.spark.sql.hive
 
+import org.apache.spark.SparkContext
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.Analyzer
-import org.apache.spark.sql.execution.SparkPlanner
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner, 
SparkSqlParser}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.streaming.StreamingQueryManager
 
 
 /**
  * A class that holds all session-specific state in a given 
[[SparkSession]] backed by Hive.
  */
-private[hive] class HiveSessionState(sparkSession: SparkSession)
-  extends SessionState(sparkSession) {
-
-  self =>
-
-  /**
-   * A Hive client used for interacting with the metastore.
-   */
-  lazy val metadataHive: HiveClient =
-
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.newSession()
-
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  override lazy val catalog = {
-new HiveSessionCatalog(
-  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
-  sparkSession.sharedState.globalTempViewManager,
-  sparkSession,
-  functionResourceLoader,
-  functionRegistry,
+private[hive] class HiveSessionState(
+sparkContext: SparkContext,
+sharedState: SharedState,
+conf: SQLConf,
+experimentalMethods: ExperimentalMethods,
+functionRegistry: FunctionRegistry,
+override val catalog: HiveSessionCatalog,
+sqlParser: ParserInterface,
+val metadataHive: HiveClient,
+override val analyzer: Analyzer,
--- End diff --

Previous implementation needed it to be that way. But can remove `override` 
now. Good catch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103307383
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -146,4 +107,153 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
 conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
 
+  /**
+   * Get an identical copy of the `HiveSessionState`.
+   * This should ideally reuse the `SessionState.clone` but cannot do so.
+   * Doing that will throw an exception when trying to clone the catalog.
+   */
+  override def clone(newSparkSession: SparkSession): HiveSessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val experimentalMethodsCopy = experimentalMethods.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  newSparkSession,
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+val hiveClient =
+  
newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new HiveSessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethodsCopy,
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  hiveClient,
+  HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator,
+  HiveSessionState.createPlannerCreator(
+newSparkSession,
+confCopy,
+experimentalMethodsCopy))
+  }
+
+}
+
+object HiveSessionState {
+
+  def apply(sparkSession: SparkSession): HiveSessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103329699
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala ---
@@ -144,11 +145,37 @@ private[hive] class TestHiveSparkSession(
 existingSharedState.getOrElse(new SharedState(sc))
   }
 
-  // TODO: Let's remove TestHiveSessionState. Otherwise, we are not really 
testing the reflection
-  // logic based on the setting of CATALOG_IMPLEMENTATION.
+  private def createHiveSessionState: HiveSessionState = {
--- End diff --

Neat, changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103298123
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala ---
@@ -46,4 +46,10 @@ class ExperimentalMethods private[sql]() {
 
   @volatile var extraOptimizations: Seq[Rule[LogicalPlan]] = Nil
 
+  override def clone(): ExperimentalMethods = {
--- End diff --

Good point, added a sync block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103297916
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 ---
@@ -1196,4 +1198,28 @@ class SessionCatalogSuite extends PlanTest {
   catalog.listFunctions("unknown_db", "func*")
 }
   }
+
+  test("copy SessionCatalog") {
+val externalCatalog = newEmptyCatalog()
+val original = new SessionCatalog(externalCatalog)
+val tempTable1 = Range(1, 10, 1, 10)
+original.createTempView("copytest1", tempTable1, overrideIfExists = 
false)
+
+// check if tables copied over
+val clone = original.clone(
+  SimpleCatalystConf(caseSensitiveAnalysis = true),
+  new Configuration(),
+  new SimpleFunctionRegistry,
+  CatalystSqlParser)
+assert(original ne clone)
+assert(clone.getTempView("copytest1") == Option(tempTable1))
+
+// check if clone and original independent
+clone.dropTable(TableIdentifier("copytest1"), ignoreIfNotExists = 
false, purge = false)
+assert(original.getTempView("copytest1") == Option(tempTable1))
+
+val tempTable2 = Range(1, 20, 2, 10)
--- End diff --

Added a test for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103336696
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -65,22 +82,118 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 hadoopConf
   }
 
-  lazy val experimentalMethods = new ExperimentalMethods
-
   /**
-   * Internal catalog for managing functions registered by the user.
+   * Get an identical copy of the `SessionState` and associate it with the 
given `SparkSession`
*/
-  lazy val functionRegistry: FunctionRegistry = 
FunctionRegistry.builtin.copy()
+  def clone(newSparkSession: SparkSession): SessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new SessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethods.clone(),
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator)
+  }
+
+  // --
+  //  Helper methods, partially leftover from pre-2.0 days
+  // --
+
+  def executePlan(plan: LogicalPlan): QueryExecution = 
queryExecutionCreator(plan)
+
+  def refreshTable(tableName: String): Unit = {
+catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
+  }
+
+  def addJar(path: String): Unit = sharedState.addJar(path)
+}
+
+
+object SessionState {
+
+  def apply(sparkSession: SparkSession): SessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): SessionState = {
+
+val sparkContext = sparkSession.sparkContext
+
+// SQL-specific key-value configurations.
--- End diff --

changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103307420
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -146,4 +107,153 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
 conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
 
+  /**
+   * Get an identical copy of the `HiveSessionState`.
+   * This should ideally reuse the `SessionState.clone` but cannot do so.
+   * Doing that will throw an exception when trying to clone the catalog.
+   */
+  override def clone(newSparkSession: SparkSession): HiveSessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val experimentalMethodsCopy = experimentalMethods.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  newSparkSession,
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+val hiveClient =
+  
newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new HiveSessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethodsCopy,
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  hiveClient,
+  HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator,
+  HiveSessionState.createPlannerCreator(
+newSparkSession,
+confCopy,
+experimentalMethodsCopy))
+  }
+
+}
+
+object HiveSessionState {
+
+  def apply(sparkSession: SparkSession): HiveSessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): HiveSessionState = {
+
+val initHelper = SessionState(sparkSession, conf)
+
+val sparkContext = sparkSession.sparkContext
+
+val catalog = HiveSessionCatalog(
+  sparkSession,
+  SessionState.createFunctionResourceLoader(sparkContext, 
sparkSession.sharedState),
+  initHelper.functionRegistry,
+  initHelper.conf,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
initHelper.conf),
+  initHelper.sqlParser)
+
+// A Hive client used for interacting with the metastore.
+val metadataHive: HiveClient =
+  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+// An analyzer that uses the Hive metastore.
+val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, 
initHelper.conf)
+
+val plannerCreator = createPlannerCreator(
+  sparkSession,
+  initHelper.conf,
+  initHelper.experimentalMethods)
+
+new HiveSessionState(
+  sparkContext,
+  sparkSession.sharedState,
+  initHelper.conf,
+  initHelper.experimentalMethods,
+  initHelper.functionRegistry,
+  catalog,
+  initHelper.sqlParser,
+  metadataHive,
+  analyzer,
+  initHelper.streamingQueryManager,
+  initHelper.queryExecutionCreator,
+  plannerCreator)
+  }
+
+  def createAnalyzer(
--- End diff --

Yes!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103336676
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -65,22 +82,118 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 hadoopConf
   }
 
-  lazy val experimentalMethods = new ExperimentalMethods
-
   /**
-   * Internal catalog for managing functions registered by the user.
+   * Get an identical copy of the `SessionState` and associate it with the 
given `SparkSession`
*/
-  lazy val functionRegistry: FunctionRegistry = 
FunctionRegistry.builtin.copy()
+  def clone(newSparkSession: SparkSession): SessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new SessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethods.clone(),
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator)
+  }
+
+  // --
+  //  Helper methods, partially leftover from pre-2.0 days
+  // --
+
+  def executePlan(plan: LogicalPlan): QueryExecution = 
queryExecutionCreator(plan)
+
+  def refreshTable(tableName: String): Unit = {
+catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
+  }
+
+  def addJar(path: String): Unit = sharedState.addJar(path)
+}
+
+
+object SessionState {
+
+  def apply(sparkSession: SparkSession): SessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): SessionState = {
+
--- End diff --

Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   >