[GitHub] spark issue #21083: [SPARK-23564][SQL] infer additional filters from constra...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21083
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21083: [SPARK-23564][SQL] infer additional filters from constra...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21083
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2572/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21083: [SPARK-23564][SQL] infer additional filters from constra...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21083
  
**[Test build #89701 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89701/testReport)**
 for PR 21083 at commit 
[`787cddf`](https://github.com/apache/spark/commit/787cddffeba0f21cd40312bcbf84d1bb75126044).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21083: [SPARK-23564][SQL] infer additional filters from constra...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/21083
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21100: [SPARK-24012][SQL] Union of map and other compatible col...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21100
  
**[Test build #89700 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89700/testReport)**
 for PR 21100 at commit 
[`0845739`](https://github.com/apache/spark/commit/08457394624567de222c89814fe632f9cb1a).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21125: [Spark-24024] Fix poisson deviance calculations in GLM t...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21125
  
**[Test build #89699 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89699/testReport)**
 for PR 21125 at commit 
[`3c6a4da`](https://github.com/apache/spark/commit/3c6a4dab973851e385b6c9a2c77e5684ad6171a4).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20980: [SPARK-23589][SQL] ExternalMapToCatalyst should support ...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20980
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20980: [SPARK-23589][SQL] ExternalMapToCatalyst should support ...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20980
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2571/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21125: [Spark-24024] Fix poisson deviance calculations in GLM t...

2018-04-22 Thread dbtsai
Github user dbtsai commented on the issue:

https://github.com/apache/spark/pull/21125
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20980: [SPARK-23589][SQL] ExternalMapToCatalyst should support ...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20980
  
**[Test build #89698 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89698/testReport)**
 for PR 20980 at commit 
[`eaef6b3`](https://github.com/apache/spark/commit/eaef6b374f86835bb08b9abf6d09d28aec1da9a8).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21125: [Spark-24024] Fix poisson deviance calculations in GLM t...

2018-04-22 Thread dbtsai
Github user dbtsai commented on the issue:

https://github.com/apache/spark/pull/21125
  
Jenkins, please test this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21124
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89694/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21124
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21124
  
**[Test build #89694 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89694/testReport)**
 for PR 21124 at commit 
[`304498e`](https://github.com/apache/spark/commit/304498eaf3ea0bd8a52a150257dc8b38a11c4108).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20940
  
**[Test build #89697 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89697/testReport)**
 for PR 20940 at commit 
[`ae8a388`](https://github.com/apache/spark/commit/ae8a388405d8d3402b5b6a45a7c7855d90538edb).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-04-22 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/20940
  
@edwinalu sorry it looks like there are conflicts, we would need to rebase 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-04-22 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/20940
  
Jenkins, retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21033: [SPARK-19320][MESOS]allow specifying a hard limit on num...

2018-04-22 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/21033
  
ping @yanji84 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183274730
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.DataSourceV2
+import org.apache.spark.sql.sources.v2.ReadSupport
+import org.apache.spark.sql.sources.v2.WriteSupport
+
+/**
+ * The base class for file data source v2. Implementations must have a 
public, 0-arg constructor.
+ *
+ * Note that this is an empty interface. Data source implementations 
should mix-in at least one of
+ * the plug-in interfaces like {@link ReadSupport} and {@link 
WriteSupport}. Otherwise it's just
+ * a dummy data source which is un-readable/writable.
+ */
+trait FileDataSourceV2 extends DataSourceV2 with DataSourceRegister {
+  /**
+   * Returns an optional V1 [[FileFormat]] class of the same file data 
source.
+   * This is a solution for the following cases:
+   * 1. File datasource V2 might be implemented partially during migration.
+   *E.g. if [[ReadSupport]] is implemented while [[WriteSupport]] is 
not,
+   *write path should fall back to V1 implementation.
+   * 2. File datasource V2 implementations cause regression.
+   * 3. Catalog support is required, which is still under development for 
data source V2.
+   */
+  def fallBackFileFormat: Option[Class[_]] = None
--- End diff --

why it's optional?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183274704
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.DataSourceV2
+import org.apache.spark.sql.sources.v2.ReadSupport
+import org.apache.spark.sql.sources.v2.WriteSupport
+
+/**
+ * The base class for file data source v2. Implementations must have a 
public, 0-arg constructor.
+ *
+ * Note that this is an empty interface. Data source implementations 
should mix-in at least one of
+ * the plug-in interfaces like {@link ReadSupport} and {@link 
WriteSupport}. Otherwise it's just
+ * a dummy data source which is un-readable/writable.
--- End diff --

We won't need to copy the javadoc for the parent class. Just say `A base 
interface for data source v2 implementations of the built-in file-based data 
sources.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183274588
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ---
@@ -213,6 +215,25 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
   }
 }
 
+/**
+ * Replaces [[FileDataSourceV2]] with [[DataSource]] if parent node is 
[[InsertIntoTable]].
+ */
+class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
--- End diff --

Need a little more comments about when this can happen


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21123#discussion_r183274501
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -89,8 +91,13 @@ case class DataSource(
 
   case class SourceInfo(name: String, schema: StructType, 
partitionColumns: Seq[String])
 
-  lazy val providingClass: Class[_] =
-DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
+  lazy val providingClass: Class[_] = {
+val cls = DataSource.lookupDataSource(className, 
sparkSession.sessionState.conf)
+cls.newInstance() match {
+  case f: FileDataSourceV2 => f.fallBackFileFormat.getOrElse(cls)
--- End diff --

why do we need this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21072: [SPARK-23973][SQL] Remove consecutive Sorts

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21072#discussion_r183272597
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -736,12 +736,22 @@ object EliminateSorts extends Rule[LogicalPlan] {
 }
 
 /**
- * Removes Sort operation if the child is already sorted
+ * Removes redundant Sort operation. This can happen:
+ * 1) if the child is already sorted
+ * 2) if there is another Sort operator separated by 0...n Project/Filter 
operators
  */
 object RemoveRedundantSorts extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 case Sort(orders, true, child) if 
SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
   child
+case s @ Sort(_, _, child) => s.copy(child = 
recursiveRemoveSort(child))
+  }
+
+  def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = plan match {
+case Project(fields, child) => Project(fields, 
recursiveRemoveSort(child))
+case Filter(condition, child) => Filter(condition, 
recursiveRemoveSort(child))
--- End diff --

we should at least add `ResolvedHint`. To easily expand the white list in 
the future, I'd like to change the code style to
```
def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = plan match {
  case s: Sort => recursiveRemoveSort(s.child)
  case other if canEliminateSort(other) => 
other.withNewChildren(other.children.map(recursiveRemoveSort))
  case _ => plan
}

def canEliminateSort(plan: LogicalPlan): Boolean = plan match {
  case p: Project => p.projectList.forall(_.deterministic)
  case f: Filter => f.condition.deterministic
  case _: ResolvedHint => true
  ...
  case _ => false
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20937
  
LGTM except a few minor comments


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r183271569
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -361,6 +361,12 @@ class JacksonParser(
 // For such records, all fields other than the field configured by
 // `columnNameOfCorruptRecord` are set to `null`.
 throw BadRecordException(() => recordLiteral(record), () => None, 
e)
+  case e: CharConversionException if options.encoding.isEmpty =>
+val msg =
+  """JSON parser cannot handle a character in its input.
+|Specifying encoding as an input option explicitly might help 
to resolve the issue.
+|""".stripMargin + e.getMessage
+throw new CharConversionException(msg)
--- End diff --

BTW we should also follow the existing rule and wrap the exception with 
`BadRecordException`. See the code above.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r183271309
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -361,6 +361,12 @@ class JacksonParser(
 // For such records, all fields other than the field configured by
 // `columnNameOfCorruptRecord` are set to `null`.
 throw BadRecordException(() => recordLiteral(record), () => None, 
e)
+  case e: CharConversionException if options.encoding.isEmpty =>
+val msg =
+  """JSON parser cannot handle a character in its input.
+|Specifying encoding as an input option explicitly might help 
to resolve the issue.
+|""".stripMargin + e.getMessage
+throw new CharConversionException(msg)
--- End diff --

This will lose the original stack trace, we should do `throw 
BadRecordException(() => recordLiteral(record), () => None, e)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r183271101
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -86,14 +86,41 @@ private[sql] class JSONOptions(
 
   val multiLine = 
parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
 
+  /**
+   * A string between two consecutive JSON records.
+   */
   val lineSeparator: Option[String] = parameters.get("lineSep").map { sep 
=>
 require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
 sep
   }
-  // Note that the option 'lineSep' uses a different default value in read 
and write.
-  val lineSeparatorInRead: Option[Array[Byte]] =
-lineSeparator.map(_.getBytes(StandardCharsets.UTF_8))
-  // Note that JSON uses writer with UTF-8 charset. This string will be 
written out as UTF-8.
+
+  /**
+   * Standard encoding (charset) name. For example UTF-8, UTF-16LE and 
UTF-32BE.
+   * If the encoding is not specified (None), it will be detected 
automatically
+   * when the multiLine option is set to `true`.
+   */
+  val encoding: Option[String] = parameters.get("encoding")
+.orElse(parameters.get("charset")).map { enc =>
+  // The following encodings are not supported in per-line mode 
(multiline is false)
+  // because they cause some problems in reading files with BOM which 
is supposed to
+  // present in the files with such encodings. After splitting input 
files by lines,
+  // only the first lines will have the BOM which leads to 
impossibility for reading
+  // the rest lines. Besides of that, the lineSep option must have the 
BOM in such
+  // encodings which can never present between lines.
+  val blacklist = Seq(Charset.forName("UTF-16"), 
Charset.forName("UTF-32"))
+  val isBlacklisted = blacklist.contains(Charset.forName(enc))
+  require(multiLine || !isBlacklisted,
+s"""The ${enc} encoding must not be included in the blacklist when 
multiLine is disabled:
+   | ${blacklist.mkString(", ")}""".stripMargin)
+
+  val forcingLineSep = !(multiLine == false && enc != "UTF-8" && 
lineSeparator.isEmpty)
--- End diff --

`enc != "UTF-8"`, we should not compare string directly, but turn them into 
`Charset`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r183270773
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -773,6 +776,8 @@ def json(self, path, mode=None, compression=None, 
dateFormat=None, timestampForm
 formats follow the formats at 
``java.text.SimpleDateFormat``.
 This applies to timestamp type. If None is 
set, it uses the
 default value, 
``-MM-dd'T'HH:mm:ss.SSSXXX``.
+:param encoding: specifies encoding (charset) of saved json files. 
If None is set,
+the default UTF-8 charset will be used.
--- End diff --

shall we mention that, if `encoding` is set, `lineSep` also need to be set 
when `multiLine` is false?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r183270654
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -237,6 +237,9 @@ def json(self, path, schema=None, 
primitivesAsString=None, prefersDecimal=None,
 :param allowUnquotedControlChars: allows JSON Strings to contain 
unquoted control
   characters (ASCII characters 
with value less than 32,
   including tab and line feed 
characters) or not.
+:param encoding: standard encoding (charset) name, for example 
UTF-8, UTF-16LE and UTF-32BE.
+ If None is set, the encoding of input JSON will 
be detected automatically
+ when the multiLine option is set to ``true``.
--- End diff --

Does it mean users have to set the encoding if `multiLine` is false?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to b...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21122#discussion_r183270323
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
 ---
@@ -31,10 +30,16 @@ import org.apache.spark.util.ListenerBus
  *
  * Implementations should throw [[NoSuchDatabaseException]] when databases 
don't exist.
  */
-abstract class ExternalCatalog
-  extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] {
+trait ExternalCatalog {
   import CatalogTypes.TablePartitionSpec
 
+  // Returns the underlying catalog class (e.g., HiveExternalCatalog).
+  def unwrapped: ExternalCatalog = this
--- End diff --

Maybe we can move it to `ExternalCatalogWithListener` and mark 
`SharedState.externalCatalog` as `ExternalCatalogWithListener`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21100: [SPARK-24012][SQL] Union of map and other compati...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21100#discussion_r183270011
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -111,6 +111,18 @@ object TypeCoercion {
 val dataType = findTightestCommonType(f1.dataType, f2.dataType).get
 StructField(f1.name, dataType, nullable = f1.nullable || 
f2.nullable)
   }))
+case (a1 @ ArrayType(et1, containsNull1), a2 @ ArrayType(et2, 
containsNull2))
+  if a1.sameType(a2) =>
+  findTightestCommonType(et1, et2).map(ArrayType(_, containsNull1 || 
containsNull2))
+case (m1 @ MapType(keyType1, valueType1, n1), m2 @ MapType(keyType2, 
valueType2, n2))
--- End diff --

ditto: `kt1`, `vt1`, `hasNull1`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21100: [SPARK-24012][SQL] Union of map and other compati...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21100#discussion_r183270045
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -111,6 +111,18 @@ object TypeCoercion {
 val dataType = findTightestCommonType(f1.dataType, f2.dataType).get
 StructField(f1.name, dataType, nullable = f1.nullable || 
f2.nullable)
   }))
+case (a1 @ ArrayType(et1, containsNull1), a2 @ ArrayType(et2, 
containsNull2))
+  if a1.sameType(a2) =>
+  findTightestCommonType(et1, et2).map(ArrayType(_, containsNull1 || 
containsNull2))
+case (m1 @ MapType(keyType1, valueType1, n1), m2 @ MapType(keyType2, 
valueType2, n2))
+  if m1.sameType(m2) =>
+  val keyType = findTightestCommonType(keyType1, keyType2)
+  val valueType = findTightestCommonType(valueType1, valueType2)
+  if(keyType.isEmpty || valueType.isEmpty) {
--- End diff --

We don't need this, it's guaranteed by `m1.sameType(m2)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21100: [SPARK-24012][SQL] Union of map and other compati...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21100#discussion_r183269995
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -111,6 +111,18 @@ object TypeCoercion {
 val dataType = findTightestCommonType(f1.dataType, f2.dataType).get
 StructField(f1.name, dataType, nullable = f1.nullable || 
f2.nullable)
   }))
+case (a1 @ ArrayType(et1, containsNull1), a2 @ ArrayType(et2, 
containsNull2))
--- End diff --

we can shorten the name here: `hasNull1` `hasNull2`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21100: [SPARK-24012][SQL] Union of map and other compati...

2018-04-22 Thread liutang123
Github user liutang123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21100#discussion_r183269702
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -171,6 +171,15 @@ object TypeCoercion {
   .orElse((t1, t2) match {
 case (ArrayType(et1, containsNull1), ArrayType(et2, 
containsNull2)) =>
   findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || 
containsNull2))
+case (MapType(keyType1, valueType1, n1), MapType(keyType2, 
valueType2, n2))
--- End diff --

Hi, I implements this logic in `findTightestCommonType`, looking forward to 
further review. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21100: [SPARK-24012][SQL] Union of map and other compatible col...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21100
  
**[Test build #89696 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89696/testReport)**
 for PR 21100 at commit 
[`19b5c6a`](https://github.com/apache/spark/commit/19b5c6a84b38b4ce275093f79eee0ff594e50f90).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183269323
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -794,4 +794,17 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   }
 }
   }
+
+  private def isMaterialized(df: DataFrame): Boolean = {
+val nodes = df.queryExecution.executedPlan.collect { case c: 
InMemoryTableScanExec => c }
+assert(nodes.nonEmpty, "DataFrame is not cached\n" + 
df.queryExecution.analyzed)
+nodes.forall(_.relation.cacheBuilder._cachedColumnBuffers != null)
+  }
+
+  test("SPARK-23880 table cache should be lazy and don't trigger any 
jobs") {
--- End diff --

I feel it's more clear to create a listener and explicitly show we don't 
trigger any jobs after calling `Dataset.cache`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183269227
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -794,4 +794,17 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   }
 }
   }
+
+  private def isMaterialized(df: DataFrame): Boolean = {
+val nodes = df.queryExecution.executedPlan.collect { case c: 
InMemoryTableScanExec => c }
+assert(nodes.nonEmpty, "DataFrame is not cached\n" + 
df.queryExecution.analyzed)
+nodes.forall(_.relation.cacheBuilder._cachedColumnBuffers != null)
+  }
+
+  test("SPARK-23880 table cache should be lazy and don't trigger any 
jobs") {
--- End diff --

how does this test prove we don't trigger jobs?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to b...

2018-04-22 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21122#discussion_r183269033
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
 ---
@@ -31,10 +30,16 @@ import org.apache.spark.util.ListenerBus
  *
  * Implementations should throw [[NoSuchDatabaseException]] when databases 
don't exist.
  */
-abstract class ExternalCatalog
-  extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] {
+trait ExternalCatalog {
--- End diff --

Based on your JIRA comment, can we put `@DeveloperApi` or 
`@InterfaceStability.Unstable` in this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to b...

2018-04-22 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21122#discussion_r183268517
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
 ---
@@ -31,10 +30,16 @@ import org.apache.spark.util.ListenerBus
  *
  * Implementations should throw [[NoSuchDatabaseException]] when databases 
don't exist.
  */
-abstract class ExternalCatalog
-  extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] {
+trait ExternalCatalog {
   import CatalogTypes.TablePartitionSpec
 
+  // Returns the underlying catalog class (e.g., HiveExternalCatalog).
+  def unwrapped: ExternalCatalog = this
--- End diff --

@gatorsmile . We had better skip the default implementation here. How do 
you think about that?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21018#discussion_r183268807
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -55,56 +42,38 @@ object InMemoryRelation {
 private[columnar]
 case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
 
-case class InMemoryRelation(
-output: Seq[Attribute],
+case class CachedRDDBuilder(
 useCompression: Boolean,
 batchSize: Int,
 storageLevel: StorageLevel,
 @transient child: SparkPlan,
 tableName: Option[String])(
-@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
-statsOfPlanToCache: Statistics,
-override val outputOrdering: Seq[SortOrder])
-  extends logical.LeafNode with MultiInstanceRelation {
-
-  override protected def innerChildren: Seq[SparkPlan] = Seq(child)
-
-  override def doCanonicalize(): logical.LogicalPlan =
-copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
-  storageLevel = StorageLevel.NONE,
-  child = child.canonicalized,
-  tableName = None)(
-  _cachedColumnBuffers,
-  sizeInBytesStats,
-  statsOfPlanToCache,
-  outputOrdering)
-
-  override def producedAttributes: AttributeSet = outputSet
+@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = 
null) {
 
-  @transient val partitionStatistics = new PartitionStatistics(output)
+  val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator
 
-  override def computeStats(): Statistics = {
-if (sizeInBytesStats.value == 0L) {
-  // Underlying columnar RDD hasn't been materialized, use the stats 
from the plan to cache.
-  // Note that we should drop the hint info here. We may cache a plan 
whose root node is a hint
-  // node. When we lookup the cache with a semantically same plan 
without hint info, the plan
-  // returned by cache lookup should not have hint info. If we lookup 
the cache with a
-  // semantically same plan with a different hint info, 
`CacheManager.useCachedData` will take
-  // care of it and retain the hint info in the lookup input plan.
-  statsOfPlanToCache.copy(hints = HintInfo())
-} else {
-  Statistics(sizeInBytes = sizeInBytesStats.value.longValue)
+  def cachedColumnBuffers: RDD[CachedBatch] = {
+if (_cachedColumnBuffers == null) {
+  synchronized {
+if (_cachedColumnBuffers == null) {
+  _cachedColumnBuffers = buildBuffers()
+}
+  }
 }
+_cachedColumnBuffers
   }
 
-  // If the cached column buffers were not passed in, we calculate them in 
the constructor.
-  // As in Spark, the actual work of caching is lazy.
-  if (_cachedColumnBuffers == null) {
-buildBuffers()
+  def clearCache(blocking: Boolean = true): Unit = {
+if (_cachedColumnBuffers != null) {
+  synchronized {
+if (_cachedColumnBuffers != null) {
+  _cachedColumnBuffers.unpersist(blocking)
--- End diff --

shall we also do `_cachedColumnBuffers  = null` so that `unpersist` won't 
be called twice?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...

2018-04-22 Thread ConcurrencyPractitioner
Github user ConcurrencyPractitioner commented on the issue:

https://github.com/apache/spark/pull/21124
  
+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][SPARK-23879][CORE][SQL] Introduce multiple...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19222
  
Instead of round-robin the memory block type across iterations, can we just 
operate on all the memory blocks in each iteration? Then we can remove the 
`if-else` and make the benchmark focus more on the memory block?

As a comparison, we can create a byte array, a long array, an offheap 
array, and also operate on them in each iteration.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21111: [SPARK-23877][SQL][followup] use PhysicalOperation to si...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/2
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2570/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21111: [SPARK-23877][SQL][followup] use PhysicalOperation to si...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/2
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21111: [SPARK-23877][SQL][followup] use PhysicalOperation to si...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/2
  
**[Test build #89695 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89695/testReport)**
 for PR 2 at commit 
[`d624955`](https://github.com/apache/spark/commit/d624955aa7fd07acde698a50d05ed5679ee91533).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21117: [followup][SPARK-10399][SPARK-23879][Core] Free u...

2018-04-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21117


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21111: [SPARK-23877][SQL][followup] use PhysicalOperatio...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/2#discussion_r183266741
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -114,11 +119,8 @@ case class OptimizeMetadataOnlyQuery(catalog: 
SessionCatalog) extends Rule[Logic
 relation match {
   case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, 
isStreaming) =>
 val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
-val partitionData = fsRelation.location.listFiles(relFilters, 
Nil)
-// partition data may be a stream, which can cause 
serialization to hit stack level too
-// deep exceptions because it is a recursive structure in 
memory. converting to array
-// avoids the problem.
--- End diff --

> Would it be reasonable for a future commit to remove the @transient 
modifier and re-introduce the problem?

That's very unlikely. SPARK-21884 guarantees Spark won't serialize the rows 
and we have regression tests to protect us. BTW it would be a lot of work to 
make sure all the places that create `LocalRelation` do not use recursive 
structure. I'll add some comments to `LocalRelation` to emphasize it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21117: [followup][SPARK-10399][SPARK-23879][Core] Free unused o...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/21117
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21125: [Spark-24024] Fix poisson deviance calculations in GLM t...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21125
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21125: [Spark-24024] Fix poisson deviance calculations in GLM t...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21125
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14083: [SPARK-16406][SQL] Improve performance of LogicalPlan.re...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/14083
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14083: [SPARK-16406][SQL] Improve performance of Logical...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14083#discussion_r183265525
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
 ---
@@ -138,6 +140,88 @@ package object expressions  {
 def indexOf(exprId: ExprId): Int = {
   Option(exprIdToOrdinal.get(exprId)).getOrElse(-1)
 }
+
+private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, 
Seq[Attribute]] = {
+  m.mapValues(_.distinct).map(identity)
+}
+
+/** Map to use for direct case insensitive attribute lookups. */
+@transient private lazy val direct: Map[String, Seq[Attribute]] = {
+  unique(attrs.groupBy(_.name.toLowerCase))
+}
+
+/** Map to use for qualified case insensitive attribute lookups. */
+@transient private val qualified: Map[(String, String), 
Seq[Attribute]] = {
+  val grouped = attrs.filter(_.qualifier.isDefined).groupBy { a =>
+(a.qualifier.get.toLowerCase, a.name.toLowerCase)
+  }
+  unique(grouped)
+}
+
+/** Perform attribute resolution given a name and a resolver. */
+def resolve(nameParts: Seq[String], resolver: Resolver): 
Option[NamedExpression] = {
+  // Collect matching attributes given a name and a lookup.
+  def collectMatches(name: String, candidates: 
Option[Seq[Attribute]]): Seq[Attribute] = {
+candidates.toSeq.flatMap(_.collect {
+  case a if resolver(a.name, name) => a.withName(name)
+})
+  }
+
+  // Find matches for the given name assuming that the 1st part is a 
qualifier (i.e. table name,
+  // alias, or subquery alias) and the 2nd part is the actual name. 
This returns a tuple of
+  // matched attributes and a list of parts that are to be resolved.
+  //
+  // For example, consider an example where "a" is the table name, "b" 
is the column name,
+  // and "c" is the struct field name, i.e. "a.b.c". In this case, 
Attribute will be "a.b",
+  // and the second element will be List("c").
+  val matches = nameParts match {
+case qualifier +: name +: nestedFields =>
+  val key = (qualifier.toLowerCase, name.toLowerCase)
+  val attributes = collectMatches(name, qualified.get(key)).filter 
{ a =>
+resolver(qualifier, a.qualifier.get)
+  }
+  (attributes, nestedFields)
+case all =>
+  (Nil, all)
+  }
+
+  // If none of attributes match `table.column` pattern, we try to 
resolve it as a column.
+  val (candidates, nestedFields) = matches match {
+case (Seq(), _) =>
+  val name = nameParts.head
+  val attributes = collectMatches(name, 
direct.get(name.toLowerCase))
+  (attributes, nameParts.tail)
+case _ => matches
+  }
+
+  def name = UnresolvedAttribute(nameParts).name
+  candidates match {
+case Seq(a) if nestedFields.nonEmpty =>
+  // One match, but we also need to extract the requested nested 
field.
+  // The foldLeft adds ExtractValues for every remaining parts of 
the identifier,
+  // and aliased it with the last part of the name.
+  // For example, consider "a.b.c", where "a" is resolved to an 
existing attribute.
+  // Then this will add ExtractValue("c", ExtractValue("b", a)), 
and alias the final
+  // expression as "c".
+  val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, 
name) =>
+ExtractValue(e, Literal(name), resolver)
+  }
+  Some(Alias(fieldExprs, nestedFields.last)())
+
+case Seq(a) =>
+  // One match, no nested fields, use it.
+  Some(a)
+
+case Seq() =>
+  // No matches.
+  None
+
+case ambiguousReferences =>
+  // More than one match.
+  val referenceNames = ambiguousReferences.mkString(", ")
--- End diff --

to pass the test, we should follow the previous code: 
`ambiguousReferences.map(_._1.qualifiedName).mkString(", ")`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21125: [Spark-24024] Fix poisson deviance calculations i...

2018-04-22 Thread tengpeng
GitHub user tengpeng opened a pull request:

https://github.com/apache/spark/pull/21125

[Spark-24024] Fix poisson deviance calculations in GLM to handle y = 0

## What changes were proposed in this pull request?

It is reported by Spark users that the deviance calculations does not 
handle y = 0. Thus, the correct model summary cannot be obtained. The user has 
confirmed the the issue is in
override def deviance(y: Double, mu: Double, weight: Double): Double =
{ 2.0 * weight * (y * math.log(y / mu) - (y - mu)) }
when y = 0.
 
The user also mentioned there are many other places he believe we should 
check the same thing. However, no other changes are needed, including Gamma 
distribution.

## How was this patch tested?
Add a comparison with R deviance calculation to the existing unit test,

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tengpeng/spark Spark24024GLM

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21125.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21125


commit 3c6a4dab973851e385b6c9a2c77e5684ad6171a4
Author: Teng Peng 
Date:   2018-04-23T02:31:25Z

fix deviance calculation when y = 0




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14083: [SPARK-16406][SQL] Improve performance of Logical...

2018-04-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14083#discussion_r183265417
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
 ---
@@ -138,6 +140,88 @@ package object expressions  {
 def indexOf(exprId: ExprId): Int = {
   Option(exprIdToOrdinal.get(exprId)).getOrElse(-1)
 }
+
+private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, 
Seq[Attribute]] = {
+  m.mapValues(_.distinct).map(identity)
+}
+
+/** Map to use for direct case insensitive attribute lookups. */
+@transient private lazy val direct: Map[String, Seq[Attribute]] = {
+  unique(attrs.groupBy(_.name.toLowerCase))
+}
+
+/** Map to use for qualified case insensitive attribute lookups. */
+@transient private val qualified: Map[(String, String), 
Seq[Attribute]] = {
+  val grouped = attrs.filter(_.qualifier.isDefined).groupBy { a =>
+(a.qualifier.get.toLowerCase, a.name.toLowerCase)
+  }
+  unique(grouped)
+}
+
+/** Perform attribute resolution given a name and a resolver. */
+def resolve(nameParts: Seq[String], resolver: Resolver): 
Option[NamedExpression] = {
+  // Collect matching attributes given a name and a lookup.
+  def collectMatches(name: String, candidates: 
Option[Seq[Attribute]]): Seq[Attribute] = {
+candidates.toSeq.flatMap(_.collect {
+  case a if resolver(a.name, name) => a.withName(name)
+})
+  }
+
+  // Find matches for the given name assuming that the 1st part is a 
qualifier (i.e. table name,
+  // alias, or subquery alias) and the 2nd part is the actual name. 
This returns a tuple of
+  // matched attributes and a list of parts that are to be resolved.
+  //
+  // For example, consider an example where "a" is the table name, "b" 
is the column name,
+  // and "c" is the struct field name, i.e. "a.b.c". In this case, 
Attribute will be "a.b",
+  // and the second element will be List("c").
+  val matches = nameParts match {
+case qualifier +: name +: nestedFields =>
+  val key = (qualifier.toLowerCase, name.toLowerCase)
+  val attributes = collectMatches(name, qualified.get(key)).filter 
{ a =>
+resolver(qualifier, a.qualifier.get)
+  }
+  (attributes, nestedFields)
+case all =>
+  (Nil, all)
+  }
+
+  // If none of attributes match `table.column` pattern, we try to 
resolve it as a column.
+  val (candidates, nestedFields) = matches match {
+case (Seq(), _) =>
+  val name = nameParts.head
+  val attributes = collectMatches(name, 
direct.get(name.toLowerCase))
+  (attributes, nameParts.tail)
+case _ => matches
+  }
+
+  def name = UnresolvedAttribute(nameParts).name
+  candidates match {
+case Seq(a) if nestedFields.nonEmpty =>
+  // One match, but we also need to extract the requested nested 
field.
+  // The foldLeft adds ExtractValues for every remaining parts of 
the identifier,
+  // and aliased it with the last part of the name.
+  // For example, consider "a.b.c", where "a" is resolved to an 
existing attribute.
+  // Then this will add ExtractValue("c", ExtractValue("b", a)), 
and alias the final
+  // expression as "c".
+  val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, 
name) =>
+ExtractValue(e, Literal(name), resolver)
--- End diff --

`ExtractValue` has the same perf problem, but this can  be fixed in a 
follow up


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21082
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89693/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21082
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21082
  
**[Test build #89693 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89693/testReport)**
 for PR 21082 at commit 
[`27158d9`](https://github.com/apache/spark/commit/27158d9873d54de9312db9e2db5c88d430589ade).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...

2018-04-22 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21124
  
+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...

2018-04-22 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/21124
  
@brkyvz PTAL.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21124: SPARK-23004

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21124
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2569/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21124: SPARK-23004

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21124
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21124: SPARK-23004

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21124
  
**[Test build #89694 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89694/testReport)**
 for PR 21124 at commit 
[`304498e`](https://github.com/apache/spark/commit/304498eaf3ea0bd8a52a150257dc8b38a11c4108).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21124: SPARK-23004

2018-04-22 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/21124

SPARK-23004

## What changes were proposed in this pull request?

A structured streaming query with a streaming aggregation can throw the 
following error in rare cases. 
```
java.lang.IllegalStateException: Cannot remove after already committed or 
aborted at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
 ) at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
 at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
 at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
 at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
 at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
 at org.apache.spark.sql.execution.aggregate.ObjectHashAgg
 
regateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
...
 ```
This can happen when the following conditions are accidentally hit. 
 # Streaming aggregation with aggregation function that is a subset of 
[`TypedImperativeAggregation`](https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473)
 (for example, `collect_set`, `collect_list`, `percentile`, etc.). 
 # Query running in `update}` mode
 # After the shuffle, a partition has exactly 128 records. 

This happens because of the following. 
 1. The `StateStoreSaveExec` used in streaming aggregations has the 
[following 
logic](https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359)
 when used in `update` mode.
   - There is an iterator that reads data from its parent iterator and 
updates the StateStore.
   - When the parent iterator is fully consumed (i.e. 
`baseIterator.hasNext` returns false) then all state changes are committed by 
calling {{StateStore.commit}}. 
   - The implementation of `StateStore.commit()` in `HDFSBackedStateStore` 
does not allow itself to be called twice. However, the logic is such that, if 
`hasNext` is called multiple times after `baseIterator.hasNext` has returned 
false then each time it will call `StateStore.commit`.
   - For most aggregation functions, this is okay because `hasNext` is only 
called once. But thats not the case with `ImperativeTypedAggregates`.

 1. `ImperativeTypedAggregates` are executed using 
`ObjectHashAggregateExec` which will try to use two kinds of hashmaps for 
aggregations. 
   - It will first try to use an unsorted hashmap. If the size of the 
hashmap increases beyond a certain threshold (default 128), then it will switch 
to using a sorted hashmap. 
   - The [switching 
logic](https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala)
 in `ObjectAggregationIterator` (used by `ObjectHashAggregateExec`)  is such 
that when the number of records matches the threshold (i.e. 128), it will end 
up calling the `iterator.hasNext` twice.

When combined with the above two conditions are combined, it leads to the 
above error. This latent bug has existed since Spark 2.1 when 
`ObjectHashAggregateExec` was introduced in Spark.

The solution is to use `NextIterator` or `CompletionIterator`, each of 
which has a flag to prevent the "onCompletion" tasks from being called more 
than once. In this PR, I chose to implement using `NextIterator`.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-23004

Alternatively you can review and apply these changes as the patch at:


[GitHub] spark issue #6300: [SPARK-7127] [MLLIB] Adding broadcast of model before pre...

2018-04-22 Thread Tamilselvan-Veeramani
Github user Tamilselvan-Veeramani commented on the issue:

https://github.com/apache/spark/pull/6300
  
can any one help me on, how to use transformImpl method for 
predictProbability method ? I see it's not implemented in transformImpl of 
RandomForestClassificationModel. hence my streaming job broad casting the RF 
model for every mini batch. Help me with way to implement. thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183257701
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -217,33 +295,32 @@ class StringIndexerModel (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  @Since("2.0.0")
-  override def transform(dataset: Dataset[_]): DataFrame = {
-if (!dataset.schema.fieldNames.contains($(inputCol))) {
-  logInfo(s"Input column ${$(inputCol)} does not exist during 
transformation. " +
-"Skip StringIndexerModel.")
-  return dataset.toDF
-}
-transformSchema(dataset.schema, logging = true)
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
 
-val filteredLabels = getHandleInvalid match {
-  case StringIndexer.KEEP_INVALID => labels :+ "__unknown"
-  case _ => labels
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def filterInvalidData(dataset: Dataset[_], inputColNames: 
Seq[String]): Dataset[_] = {
+var filteredDataset = dataset.na.drop(inputColNames.filter(
+  dataset.schema.fieldNames.contains(_)))
--- End diff --

add one empty line for readbility


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183255152
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +161,53 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+val zeroState = Array.fill(inputCols.length)(new OpenHashMap[String, 
Long]())
+
+// Counts by the string values in the dataset.
+val countByValueArray = dataset.na.drop(inputCols)
+  .select(inputCols.map(col(_).cast(StringType)): _*)
+  .rdd.treeAggregate(zeroState)(
+(state: Array[OpenHashMap[String, Long]], row: Row) => {
+  for (i <- 0 until inputCols.length) {
+state(i).changeValue(row.getString(i), 1L, _ + 1)
+  }
+  state
+},
+(state1: Array[OpenHashMap[String, Long]], state2: 
Array[OpenHashMap[String, Long]]) => {
+  for (i <- 0 until inputCols.length) {
+state2(i).foreach { case (key: String, count: Long) =>
+  state1(i).changeValue(key, count, _ + count)
+}
+  }
+  state1
+}
+  )
+
+// In case of equal frequency when frequencyDesc/Asc, we further sort 
the strings by alphabet.
+val labelsArray = countByValueArray.map { countByValue =>
+  $(stringOrderType) match {
+case StringIndexer.frequencyDesc =>
+  countByValue.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray
+case StringIndexer.frequencyAsc =>
+  countByValue.toSeq.sortBy(_._1).sortBy(_._2).map(_._1).toArray
+case StringIndexer.alphabetDesc => 
countByValue.toSeq.map(_._1).sortWith(_ > _).toArray
--- End diff --

I think we can break the code into two paths. One is sorting by frequency 
which requires to compute the counts, and the other is sorting by alphabet 
which only requires distinct. We could move the `countByValueArray` code into 
labelsArray.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183253488
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params 
with HasHandleInvalid wi
   @Since("2.3.0")
   def getStringOrderType: String = $(stringOrderType)
 
-  /** Validates and transforms the input schema. */
-  protected def validateAndTransformSchema(schema: StructType): StructType 
= {
-val inputColName = $(inputCol)
+  /** Returns the input and output column names corresponding in pair. */
+  private[feature] def getInOutCols(): (Array[String], Array[String]) = {
+ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), 
Seq(outputCols))
+
+if (isSet(inputCol)) {
--- End diff --

If both `inputCol` and `inputCols` are  set, throw an exception. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r18325
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -217,33 +295,32 @@ class StringIndexerModel (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  @Since("2.0.0")
-  override def transform(dataset: Dataset[_]): DataFrame = {
-if (!dataset.schema.fieldNames.contains($(inputCol))) {
-  logInfo(s"Input column ${$(inputCol)} does not exist during 
transformation. " +
-"Skip StringIndexerModel.")
-  return dataset.toDF
-}
-transformSchema(dataset.schema, logging = true)
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
 
-val filteredLabels = getHandleInvalid match {
-  case StringIndexer.KEEP_INVALID => labels :+ "__unknown"
-  case _ => labels
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def filterInvalidData(dataset: Dataset[_], inputColNames: 
Seq[String]): Dataset[_] = {
+var filteredDataset = dataset.na.drop(inputColNames.filter(
+  dataset.schema.fieldNames.contains(_)))
+for (i <- 0 until inputColNames.length) {
+  val inputColName = inputColNames(i)
+  val labelToIndex = labelsToIndexArray(i)
+  val filterer = udf { label: String =>
+labelToIndex.contains(label)
--- End diff --

Isn't this very expansive? We basically look up `labelToIndex` twice.

Will be cool that we support `lit(Map())` so we can do those lookup 
natively in SQL, and also `na.drop` together in wholestage codegen. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183253932
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params 
with HasHandleInvalid wi
   @Since("2.3.0")
   def getStringOrderType: String = $(stringOrderType)
 
-  /** Validates and transforms the input schema. */
-  protected def validateAndTransformSchema(schema: StructType): StructType 
= {
-val inputColName = $(inputCol)
+  /** Returns the input and output column names corresponding in pair. */
+  private[feature] def getInOutCols(): (Array[String], Array[String]) = {
+ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), 
Seq(outputCols))
+
+if (isSet(inputCol)) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
+"The number of input columns does not match output columns")
+  ($(inputCols), $(outputCols))
+}
+  }
+
+  private def validateAndTransformField(
+  schema: StructType,
+  inputColName: String,
+  outputColName: String): StructField = {
 val inputDataType = schema(inputColName).dataType
 require(inputDataType == StringType || 
inputDataType.isInstanceOf[NumericType],
   s"The input column $inputColName must be either string type or 
numeric type, " +
 s"but got $inputDataType.")
-val inputFields = schema.fields
-val outputColName = $(outputCol)
-require(inputFields.forall(_.name != outputColName),
+require(schema.fields.forall(_.name != outputColName),
   s"Output column $outputColName already exists.")
-val attr = NominalAttribute.defaultAttr.withName($(outputCol))
-val outputFields = inputFields :+ attr.toStructField()
-StructType(outputFields)
+NominalAttribute.defaultAttr.withName($(outputCol)).toStructField()
+  }
+
+  /** Validates and transforms the input schema. */
+  protected def validateAndTransformSchema(
+  schema: StructType,
+  skipNonExistsCol: Boolean = false): StructType = {
+val (inputColNames, outputColNames) = getInOutCols()
+
+val outputFields = for (i <- 0 until inputColNames.length) yield {
+  if (schema.fieldNames.contains(inputColNames(i))) {
+validateAndTransformField(schema, inputColNames(i), 
outputColNames(i))
+  } else {
+if (skipNonExistsCol) {
+  null
+} else {
+  throw new SparkException(s"Input column ${inputColNames(i)} does 
not exist.")
+}
+  }
+}
+StructType(schema.fields ++ outputFields.filter(_ != null))
--- End diff --

Then you don't need to filter with the above code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183257676
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -217,33 +295,32 @@ class StringIndexerModel (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  @Since("2.0.0")
-  override def transform(dataset: Dataset[_]): DataFrame = {
-if (!dataset.schema.fieldNames.contains($(inputCol))) {
-  logInfo(s"Input column ${$(inputCol)} does not exist during 
transformation. " +
-"Skip StringIndexerModel.")
-  return dataset.toDF
-}
-transformSchema(dataset.schema, logging = true)
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
 
-val filteredLabels = getHandleInvalid match {
-  case StringIndexer.KEEP_INVALID => labels :+ "__unknown"
-  case _ => labels
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def filterInvalidData(dataset: Dataset[_], inputColNames: 
Seq[String]): Dataset[_] = {
--- End diff --

Please add some comment about what is the invalid data.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183254078
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +161,53 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+val zeroState = Array.fill(inputCols.length)(new OpenHashMap[String, 
Long]())
+
+// Counts by the string values in the dataset.
+val countByValueArray = dataset.na.drop(inputCols)
+  .select(inputCols.map(col(_).cast(StringType)): _*)
+  .rdd.treeAggregate(zeroState)(
--- End diff --

Possible to aggregate natively with SQL? I don't think we will compromise 
the performance with SQL aggregation like `groupBy` and `agg` and 
`countDistinct` without using tree aggregation since the states will be very 
small in this use-case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183258353
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -217,33 +295,32 @@ class StringIndexerModel (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  @Since("2.0.0")
-  override def transform(dataset: Dataset[_]): DataFrame = {
-if (!dataset.schema.fieldNames.contains($(inputCol))) {
-  logInfo(s"Input column ${$(inputCol)} does not exist during 
transformation. " +
-"Skip StringIndexerModel.")
-  return dataset.toDF
-}
-transformSchema(dataset.schema, logging = true)
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
 
-val filteredLabels = getHandleInvalid match {
-  case StringIndexer.KEEP_INVALID => labels :+ "__unknown"
-  case _ => labels
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def filterInvalidData(dataset: Dataset[_], inputColNames: 
Seq[String]): Dataset[_] = {
+var filteredDataset = dataset.na.drop(inputColNames.filter(
+  dataset.schema.fieldNames.contains(_)))
+for (i <- 0 until inputColNames.length) {
+  val inputColName = inputColNames(i)
+  val labelToIndex = labelsToIndexArray(i)
+  val filterer = udf { label: String =>
+labelToIndex.contains(label)
+  }
+  filteredDataset = 
filteredDataset.where(filterer(dataset(inputColName)))
 }
+filteredDataset
+  }
 
-val metadata = NominalAttribute.defaultAttr
-  .withName($(outputCol)).withValues(filteredLabels).toMetadata()
-// If we are skipping invalid records, filter them out.
-val (filteredDataset, keepInvalid) = $(handleInvalid) match {
-  case StringIndexer.SKIP_INVALID =>
-val filterer = udf { label: String =>
-  labelToIndex.contains(label)
-}
-
(dataset.na.drop(Array($(inputCol))).where(filterer(dataset($(inputCol, 
false)
-  case _ => (dataset, getHandleInvalid == StringIndexer.KEEP_INVALID)
-}
+  private def getIndexer(labels: Seq[String], labelToIndex: 
OpenHashMap[String, Double]) = {
+val keepInvalid = (getHandleInvalid == StringIndexer.KEEP_INVALID)
 
-val indexer = udf { label: String =>
+udf { label: String =>
--- End diff --

This requires calling many udf for different input columns. Should we 
combine then in one udf? The `filteredDataset` logic can be in as well to avoid 
multiple lookups. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183257799
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -217,33 +295,32 @@ class StringIndexerModel (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  @Since("2.0.0")
-  override def transform(dataset: Dataset[_]): DataFrame = {
-if (!dataset.schema.fieldNames.contains($(inputCol))) {
-  logInfo(s"Input column ${$(inputCol)} does not exist during 
transformation. " +
-"Skip StringIndexerModel.")
-  return dataset.toDF
-}
-transformSchema(dataset.schema, logging = true)
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
 
-val filteredLabels = getHandleInvalid match {
-  case StringIndexer.KEEP_INVALID => labels :+ "__unknown"
-  case _ => labels
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def filterInvalidData(dataset: Dataset[_], inputColNames: 
Seq[String]): Dataset[_] = {
+var filteredDataset = dataset.na.drop(inputColNames.filter(
+  dataset.schema.fieldNames.contains(_)))
+for (i <- 0 until inputColNames.length) {
+  val inputColName = inputColNames(i)
+  val labelToIndex = labelsToIndexArray(i)
+  val filterer = udf { label: String =>
+labelToIndex.contains(label)
+  }
+  filteredDataset = 
filteredDataset.where(filterer(dataset(inputColName)))
--- End diff --

is it possible to not use `var filteredDataset`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-04-22 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r183253904
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params 
with HasHandleInvalid wi
   @Since("2.3.0")
   def getStringOrderType: String = $(stringOrderType)
 
-  /** Validates and transforms the input schema. */
-  protected def validateAndTransformSchema(schema: StructType): StructType 
= {
-val inputColName = $(inputCol)
+  /** Returns the input and output column names corresponding in pair. */
+  private[feature] def getInOutCols(): (Array[String], Array[String]) = {
+ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), 
Seq(outputCols))
+
+if (isSet(inputCol)) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
+"The number of input columns does not match output columns")
+  ($(inputCols), $(outputCols))
+}
+  }
+
+  private def validateAndTransformField(
+  schema: StructType,
+  inputColName: String,
+  outputColName: String): StructField = {
 val inputDataType = schema(inputColName).dataType
 require(inputDataType == StringType || 
inputDataType.isInstanceOf[NumericType],
   s"The input column $inputColName must be either string type or 
numeric type, " +
 s"but got $inputDataType.")
-val inputFields = schema.fields
-val outputColName = $(outputCol)
-require(inputFields.forall(_.name != outputColName),
+require(schema.fields.forall(_.name != outputColName),
   s"Output column $outputColName already exists.")
-val attr = NominalAttribute.defaultAttr.withName($(outputCol))
-val outputFields = inputFields :+ attr.toStructField()
-StructType(outputFields)
+NominalAttribute.defaultAttr.withName($(outputCol)).toStructField()
+  }
+
+  /** Validates and transforms the input schema. */
+  protected def validateAndTransformSchema(
+  schema: StructType,
+  skipNonExistsCol: Boolean = false): StructType = {
+val (inputColNames, outputColNames) = getInOutCols()
+
+val outputFields = for (i <- 0 until inputColNames.length) yield {
--- End diff --

Nit, why not the following for readability? 

```scala
val outputFields = inputColNames.zip(outputColNames).flatMap { case 
(inputColName, outputColName) =>
  schema.fieldNames.contains(inputColName) match {
case true => validateAndTransformField(schema, inputColName, 
outputColName)
case false if skipNonExistsCol => None
case throw new SparkException(s"Input column $inputColName does not 
exist."
  }
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21056
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89692/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21056
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21056
  
**[Test build #89692 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89692/testReport)**
 for PR 21056 at commit 
[`1b86df3`](https://github.com/apache/spark/commit/1b86df3293612ef1db80220c8d8e71a4b917a5c7).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20937
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20937
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89691/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20937
  
**[Test build #89691 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89691/testReport)**
 for PR 20937 at commit 
[`482b799`](https://github.com/apache/spark/commit/482b79969b9e0cc475e63b415051b32423facef4).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21071: [SPARK-21962][CORE] Distributed Tracing in Spark

2018-04-22 Thread devaraj-kavali
Github user devaraj-kavali closed the pull request at:

https://github.com/apache/spark/pull/21071


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-22 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183253723
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by 
encapsulating elements into pairs with indexes indicating the order.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array("d", "a", null, "b"));
+   [("d",0),("a",1),(null,2),("b",3)]
+  > SELECT _FUNC_(array("d", "a", null, "b"), true);
+   [(0,"d"),(1,"a"),(2,null),(3,"b")]
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ZipWithIndex(child: Expression, indexFirst: Expression)
+  extends UnaryExpression with ExpectsInputTypes {
+
+  def this(e: Expression) = this(e, Literal.FalseLiteral)
+
+  private val idxFirst: Boolean = indexFirst match {
+case Literal(v: Boolean, BooleanType) => v
+case _ => throw new AnalysisException("The second argument has to be a 
boolean constant.")
+  }
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
+
+  lazy val childArrayType: ArrayType = 
child.dataType.asInstanceOf[ArrayType]
+
+  override def dataType: DataType = {
+val elementField = StructField("value", childArrayType.elementType, 
childArrayType.containsNull)
+val indexField = StructField("index", IntegerType, false)
+
+val fields = if (idxFirst) Seq(indexField, elementField) else 
Seq(elementField, indexField)
+
+ArrayType(StructType(fields), false)
+  }
+
+  override protected def nullSafeEval(input: Any): Any = {
+val array = 
input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType)
+
+val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) 
else InternalRow(v, i)
+val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, 
i)}
+
+new GenericArrayData(resultData)
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+nullSafeCodeGen(ctx, ev, c => {
+  if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) {
+genCodeForPrimitiveElements(ctx, c, ev.value)
+  } else {
+genCodeForNonPrimitiveElements(ctx, c, ev.value)
+  }
+})
+  }
+
+  private def genCodeForPrimitiveElements(
+  ctx: CodegenContext,
+  childVariableName: String,
+  arrayData: String): String = {
+val numElements = ctx.freshName("numElements")
+val byteArraySize = ctx.freshName("byteArraySize")
+val data = ctx.freshName("byteArray")
+val unsafeRow = ctx.freshName("unsafeRow")
+val structSize = ctx.freshName("structSize")
+val unsafeArrayData = ctx.freshName("unsafeArrayData")
+val structsOffset = ctx.freshName("structsOffset")
+val calculateArraySize = 
"UnsafeArrayData.calculateSizeOfUnderlyingByteArray"
+val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes"
+
+val baseOffset = Platform.BYTE_ARRAY_OFFSET
+val longSize = LongType.defaultSize
+val primitiveValueTypeName = 
CodeGenerator.primitiveTypeName(childArrayType.elementType)
+val (valuePosition, indexPosition) = if (idxFirst) ("1", "0") else 
("0", "1")
+
+s"""
+   |final int $numElements = $childVariableName.numElements();
+   |final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) 
+ longSize * 2};
+   |final long $byteArraySize = $calculateArraySize($numElements, 
$longSize + $structSize);
+   |final int $structsOffset = $calculateHeader($numElements) + 
$numElements * $longSize;
+   |if ($byteArraySize > $MAX_ARRAY_LENGTH) {
--- End diff --

I like your suggestion. So instead of throwing the exception, the function 
will execute a similar piece of code as in `genCodeForNonPrimitiveElements`...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-22 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183253226
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by 
encapsulating elements into pairs with indexes indicating the order.",
--- End diff --

That's really good question! The newly added functions `element_at` and 
`array_position` are 1-based. But on the other handed, the `getItem` from the 
`Column` class is 0-based. What about adding one extra parameter and let users 
decide whether the array will indexed from 0 or 1. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-04-22 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21121#discussion_r183252854
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 
   override def sql: String = s"concat(${children.map(_.sql).mkString(", 
")})"
 }
+
+/**
+ * Returns the maximum value in the array.
--- End diff --

Good spot. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21082
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2568/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21082
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21082
  
**[Test build #89693 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89693/testReport)**
 for PR 21082 at commit 
[`27158d9`](https://github.com/apache/spark/commit/27158d9873d54de9312db9e2db5c88d430589ade).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21056
  
**[Test build #89692 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89692/testReport)**
 for PR 21056 at commit 
[`1b86df3`](https://github.com/apache/spark/commit/1b86df3293612ef1db80220c8d8e71a4b917a5c7).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20937
  
**[Test build #89691 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89691/testReport)**
 for PR 20937 at commit 
[`482b799`](https://github.com/apache/spark/commit/482b79969b9e0cc475e63b415051b32423facef4).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21071: [SPARK-21962][CORE] Distributed Tracing in Spark

2018-04-22 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/21071
  
@devaraj-kavali can you close this PR first?

Looks like there isn't any reason to really use htrace anymore ...



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-04-22 Thread edwinalu
Github user edwinalu commented on the issue:

https://github.com/apache/spark/pull/20940
  
Could a committer please request a retest? It looks like the tests passed 
(https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89685/testReport/),
 and the failure occurs after posting to github:

Attempting to post to Github...
[error] running /home/jenkins/workspace/SparkPullRequestBuilder/build/sbt 
-Phadoop-2.6 -Pkubernetes -Pflume -Phive-thriftserver -Pyarn -Pkafka-0-8 -Phive 
-Pkinesis-asl -Pmesos test ; process was terminated by signal 9
 > Post successful.
Build step 'Execute shell' marked build as failure
Archiving artifacts
Recording test results
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89685/
Test FAILed.
Finished: FAILURE


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20933
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89690/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20933
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20933
  
**[Test build #89690 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89690/testReport)**
 for PR 20933 at commit 
[`359f846`](https://github.com/apache/spark/commit/359f846112ba8c7ee9023b7754da4a907068b39b).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends 
Rule[LogicalPlan] `
  * `abstract class FileDataSourceV2 extends DataSourceV2 `
  * `class OrcDataSourceV2 extends FileDataSourceV2 with ReadSupport with 
ReadSupportWithSchema `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21031: [SPARK-23923][SQL] Add cardinality function

2018-04-22 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21031
  
Sure, done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21123: [SPARK-24045][SQL]Create base class for file data source...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21123
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21123: [SPARK-24045][SQL]Create base class for file data source...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21123
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89689/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21123: [SPARK-24045][SQL]Create base class for file data source...

2018-04-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21123
  
**[Test build #89689 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89689/testReport)**
 for PR 21123 at commit 
[`95628e5`](https://github.com/apache/spark/commit/95628e5a027d029be7dcc4e8e979555bc5e0e4a3).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends 
Rule[LogicalPlan] `
  * `trait FileDataSourceV2 extends DataSourceV2 with DataSourceRegister `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20933
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...

2018-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20933
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2567/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >