[GitHub] spark issue #21083: [SPARK-23564][SQL] infer additional filters from constra...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21083 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21083: [SPARK-23564][SQL] infer additional filters from constra...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21083 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2572/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21083: [SPARK-23564][SQL] infer additional filters from constra...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21083 **[Test build #89701 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89701/testReport)** for PR 21083 at commit [`787cddf`](https://github.com/apache/spark/commit/787cddffeba0f21cd40312bcbf84d1bb75126044). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21083: [SPARK-23564][SQL] infer additional filters from constra...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21083 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21100: [SPARK-24012][SQL] Union of map and other compatible col...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21100 **[Test build #89700 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89700/testReport)** for PR 21100 at commit [`0845739`](https://github.com/apache/spark/commit/08457394624567de222c89814fe632f9cb1a). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21125: [Spark-24024] Fix poisson deviance calculations in GLM t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21125 **[Test build #89699 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89699/testReport)** for PR 21125 at commit [`3c6a4da`](https://github.com/apache/spark/commit/3c6a4dab973851e385b6c9a2c77e5684ad6171a4). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20980: [SPARK-23589][SQL] ExternalMapToCatalyst should support ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20980 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20980: [SPARK-23589][SQL] ExternalMapToCatalyst should support ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20980 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2571/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21125: [Spark-24024] Fix poisson deviance calculations in GLM t...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21125 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20980: [SPARK-23589][SQL] ExternalMapToCatalyst should support ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20980 **[Test build #89698 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89698/testReport)** for PR 20980 at commit [`eaef6b3`](https://github.com/apache/spark/commit/eaef6b374f86835bb08b9abf6d09d28aec1da9a8). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21125: [Spark-24024] Fix poisson deviance calculations in GLM t...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21125 Jenkins, please test this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21124 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89694/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21124 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21124 **[Test build #89694 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89694/testReport)** for PR 21124 at commit [`304498e`](https://github.com/apache/spark/commit/304498eaf3ea0bd8a52a150257dc8b38a11c4108). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20940 **[Test build #89697 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89697/testReport)** for PR 20940 at commit [`ae8a388`](https://github.com/apache/spark/commit/ae8a388405d8d3402b5b6a45a7c7855d90538edb). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/20940 @edwinalu sorry it looks like there are conflicts, we would need to rebase --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/20940 Jenkins, retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21033: [SPARK-19320][MESOS]allow specifying a hard limit on num...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/21033 ping @yanji84 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183274730 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.DataSourceV2 +import org.apache.spark.sql.sources.v2.ReadSupport +import org.apache.spark.sql.sources.v2.WriteSupport + +/** + * The base class for file data source v2. Implementations must have a public, 0-arg constructor. + * + * Note that this is an empty interface. Data source implementations should mix-in at least one of + * the plug-in interfaces like {@link ReadSupport} and {@link WriteSupport}. Otherwise it's just + * a dummy data source which is un-readable/writable. + */ +trait FileDataSourceV2 extends DataSourceV2 with DataSourceRegister { + /** + * Returns an optional V1 [[FileFormat]] class of the same file data source. + * This is a solution for the following cases: + * 1. File datasource V2 might be implemented partially during migration. + *E.g. if [[ReadSupport]] is implemented while [[WriteSupport]] is not, + *write path should fall back to V1 implementation. + * 2. File datasource V2 implementations cause regression. + * 3. Catalog support is required, which is still under development for data source V2. + */ + def fallBackFileFormat: Option[Class[_]] = None --- End diff -- why it's optional? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183274704 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.DataSourceV2 +import org.apache.spark.sql.sources.v2.ReadSupport +import org.apache.spark.sql.sources.v2.WriteSupport + +/** + * The base class for file data source v2. Implementations must have a public, 0-arg constructor. + * + * Note that this is an empty interface. Data source implementations should mix-in at least one of + * the plug-in interfaces like {@link ReadSupport} and {@link WriteSupport}. Otherwise it's just + * a dummy data source which is un-readable/writable. --- End diff -- We won't need to copy the javadoc for the parent class. Just say `A base interface for data source v2 implementations of the built-in file-based data sources.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183274588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala --- @@ -213,6 +215,25 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast } } +/** + * Replaces [[FileDataSourceV2]] with [[DataSource]] if parent node is [[InsertIntoTable]]. + */ +class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends Rule[LogicalPlan] { --- End diff -- Need a little more comments about when this can happen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21123: [SPARK-24045][SQL]Create base class for file data...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21123#discussion_r183274501 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -89,8 +91,13 @@ case class DataSource( case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) - lazy val providingClass: Class[_] = -DataSource.lookupDataSource(className, sparkSession.sessionState.conf) + lazy val providingClass: Class[_] = { +val cls = DataSource.lookupDataSource(className, sparkSession.sessionState.conf) +cls.newInstance() match { + case f: FileDataSourceV2 => f.fallBackFileFormat.getOrElse(cls) --- End diff -- why do we need this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21072: [SPARK-23973][SQL] Remove consecutive Sorts
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21072#discussion_r183272597 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -736,12 +736,22 @@ object EliminateSorts extends Rule[LogicalPlan] { } /** - * Removes Sort operation if the child is already sorted + * Removes redundant Sort operation. This can happen: + * 1) if the child is already sorted + * 2) if there is another Sort operator separated by 0...n Project/Filter operators */ object RemoveRedundantSorts extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Sort(orders, true, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) => child +case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child)) + } + + def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = plan match { +case Project(fields, child) => Project(fields, recursiveRemoveSort(child)) +case Filter(condition, child) => Filter(condition, recursiveRemoveSort(child)) --- End diff -- we should at least add `ResolvedHint`. To easily expand the white list in the future, I'd like to change the code style to ``` def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = plan match { case s: Sort => recursiveRemoveSort(s.child) case other if canEliminateSort(other) => other.withNewChildren(other.children.map(recursiveRemoveSort)) case _ => plan } def canEliminateSort(plan: LogicalPlan): Boolean = plan match { case p: Project => p.projectList.forall(_.deterministic) case f: Filter => f.condition.deterministic case _: ResolvedHint => true ... case _ => false } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20937 LGTM except a few minor comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183271569 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,6 +361,12 @@ class JacksonParser( // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) + case e: CharConversionException if options.encoding.isEmpty => +val msg = + """JSON parser cannot handle a character in its input. +|Specifying encoding as an input option explicitly might help to resolve the issue. +|""".stripMargin + e.getMessage +throw new CharConversionException(msg) --- End diff -- BTW we should also follow the existing rule and wrap the exception with `BadRecordException`. See the code above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183271309 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,6 +361,12 @@ class JacksonParser( // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) + case e: CharConversionException if options.encoding.isEmpty => +val msg = + """JSON parser cannot handle a character in its input. +|Specifying encoding as an input option explicitly might help to resolve the issue. +|""".stripMargin + e.getMessage +throw new CharConversionException(msg) --- End diff -- This will lose the original stack trace, we should do `throw BadRecordException(() => recordLiteral(record), () => None, e)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183271101 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +86,41 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) + /** + * A string between two consecutive JSON records. + */ val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => require(sep.nonEmpty, "'lineSep' cannot be an empty string.") sep } - // Note that the option 'lineSep' uses a different default value in read and write. - val lineSeparatorInRead: Option[Array[Byte]] = -lineSeparator.map(_.getBytes(StandardCharsets.UTF_8)) - // Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8. + + /** + * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. + * If the encoding is not specified (None), it will be detected automatically + * when the multiLine option is set to `true`. + */ + val encoding: Option[String] = parameters.get("encoding") +.orElse(parameters.get("charset")).map { enc => + // The following encodings are not supported in per-line mode (multiline is false) + // because they cause some problems in reading files with BOM which is supposed to + // present in the files with such encodings. After splitting input files by lines, + // only the first lines will have the BOM which leads to impossibility for reading + // the rest lines. Besides of that, the lineSep option must have the BOM in such + // encodings which can never present between lines. + val blacklist = Seq(Charset.forName("UTF-16"), Charset.forName("UTF-32")) + val isBlacklisted = blacklist.contains(Charset.forName(enc)) + require(multiLine || !isBlacklisted, +s"""The ${enc} encoding must not be included in the blacklist when multiLine is disabled: + | ${blacklist.mkString(", ")}""".stripMargin) + + val forcingLineSep = !(multiLine == false && enc != "UTF-8" && lineSeparator.isEmpty) --- End diff -- `enc != "UTF-8"`, we should not compare string directly, but turn them into `Charset` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183270773 --- Diff: python/pyspark/sql/readwriter.py --- @@ -773,6 +776,8 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the default value, ``-MM-dd'T'HH:mm:ss.SSSXXX``. +:param encoding: specifies encoding (charset) of saved json files. If None is set, +the default UTF-8 charset will be used. --- End diff -- shall we mention that, if `encoding` is set, `lineSep` also need to be set when `multiLine` is false? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183270654 --- Diff: python/pyspark/sql/readwriter.py --- @@ -237,6 +237,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. +:param encoding: standard encoding (charset) name, for example UTF-8, UTF-16LE and UTF-32BE. + If None is set, the encoding of input JSON will be detected automatically + when the multiLine option is set to ``true``. --- End diff -- Does it mean users have to set the encoding if `multiLine` is false? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21122#discussion_r183270323 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala --- @@ -31,10 +30,16 @@ import org.apache.spark.util.ListenerBus * * Implementations should throw [[NoSuchDatabaseException]] when databases don't exist. */ -abstract class ExternalCatalog - extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] { +trait ExternalCatalog { import CatalogTypes.TablePartitionSpec + // Returns the underlying catalog class (e.g., HiveExternalCatalog). + def unwrapped: ExternalCatalog = this --- End diff -- Maybe we can move it to `ExternalCatalogWithListener` and mark `SharedState.externalCatalog` as `ExternalCatalogWithListener` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21100: [SPARK-24012][SQL] Union of map and other compati...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21100#discussion_r183270011 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -111,6 +111,18 @@ object TypeCoercion { val dataType = findTightestCommonType(f1.dataType, f2.dataType).get StructField(f1.name, dataType, nullable = f1.nullable || f2.nullable) })) +case (a1 @ ArrayType(et1, containsNull1), a2 @ ArrayType(et2, containsNull2)) + if a1.sameType(a2) => + findTightestCommonType(et1, et2).map(ArrayType(_, containsNull1 || containsNull2)) +case (m1 @ MapType(keyType1, valueType1, n1), m2 @ MapType(keyType2, valueType2, n2)) --- End diff -- ditto: `kt1`, `vt1`, `hasNull1` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21100: [SPARK-24012][SQL] Union of map and other compati...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21100#discussion_r183270045 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -111,6 +111,18 @@ object TypeCoercion { val dataType = findTightestCommonType(f1.dataType, f2.dataType).get StructField(f1.name, dataType, nullable = f1.nullable || f2.nullable) })) +case (a1 @ ArrayType(et1, containsNull1), a2 @ ArrayType(et2, containsNull2)) + if a1.sameType(a2) => + findTightestCommonType(et1, et2).map(ArrayType(_, containsNull1 || containsNull2)) +case (m1 @ MapType(keyType1, valueType1, n1), m2 @ MapType(keyType2, valueType2, n2)) + if m1.sameType(m2) => + val keyType = findTightestCommonType(keyType1, keyType2) + val valueType = findTightestCommonType(valueType1, valueType2) + if(keyType.isEmpty || valueType.isEmpty) { --- End diff -- We don't need this, it's guaranteed by `m1.sameType(m2)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21100: [SPARK-24012][SQL] Union of map and other compati...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21100#discussion_r183269995 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -111,6 +111,18 @@ object TypeCoercion { val dataType = findTightestCommonType(f1.dataType, f2.dataType).get StructField(f1.name, dataType, nullable = f1.nullable || f2.nullable) })) +case (a1 @ ArrayType(et1, containsNull1), a2 @ ArrayType(et2, containsNull2)) --- End diff -- we can shorten the name here: `hasNull1` `hasNull2` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21100: [SPARK-24012][SQL] Union of map and other compati...
Github user liutang123 commented on a diff in the pull request: https://github.com/apache/spark/pull/21100#discussion_r183269702 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -171,6 +171,15 @@ object TypeCoercion { .orElse((t1, t2) match { case (ArrayType(et1, containsNull1), ArrayType(et2, containsNull2)) => findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || containsNull2)) +case (MapType(keyType1, valueType1, n1), MapType(keyType2, valueType2, n2)) --- End diff -- Hi, I implements this logic in `findTightestCommonType`, looking forward to further review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21100: [SPARK-24012][SQL] Union of map and other compatible col...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21100 **[Test build #89696 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89696/testReport)** for PR 21100 at commit [`19b5c6a`](https://github.com/apache/spark/commit/19b5c6a84b38b4ce275093f79eee0ff594e50f90). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183269323 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -794,4 +794,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } } } + + private def isMaterialized(df: DataFrame): Boolean = { +val nodes = df.queryExecution.executedPlan.collect { case c: InMemoryTableScanExec => c } +assert(nodes.nonEmpty, "DataFrame is not cached\n" + df.queryExecution.analyzed) +nodes.forall(_.relation.cacheBuilder._cachedColumnBuffers != null) + } + + test("SPARK-23880 table cache should be lazy and don't trigger any jobs") { --- End diff -- I feel it's more clear to create a listener and explicitly show we don't trigger any jobs after calling `Dataset.cache` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183269227 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -794,4 +794,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } } } + + private def isMaterialized(df: DataFrame): Boolean = { +val nodes = df.queryExecution.executedPlan.collect { case c: InMemoryTableScanExec => c } +assert(nodes.nonEmpty, "DataFrame is not cached\n" + df.queryExecution.analyzed) +nodes.forall(_.relation.cacheBuilder._cachedColumnBuffers != null) + } + + test("SPARK-23880 table cache should be lazy and don't trigger any jobs") { --- End diff -- how does this test prove we don't trigger jobs? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21122#discussion_r183269033 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala --- @@ -31,10 +30,16 @@ import org.apache.spark.util.ListenerBus * * Implementations should throw [[NoSuchDatabaseException]] when databases don't exist. */ -abstract class ExternalCatalog - extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] { +trait ExternalCatalog { --- End diff -- Based on your JIRA comment, can we put `@DeveloperApi` or `@InterfaceStability.Unstable` in this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21122#discussion_r183268517 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala --- @@ -31,10 +30,16 @@ import org.apache.spark.util.ListenerBus * * Implementations should throw [[NoSuchDatabaseException]] when databases don't exist. */ -abstract class ExternalCatalog - extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] { +trait ExternalCatalog { import CatalogTypes.TablePartitionSpec + // Returns the underlying catalog class (e.g., HiveExternalCatalog). + def unwrapped: ExternalCatalog = this --- End diff -- @gatorsmile . We had better skip the default implementation here. How do you think about that? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r183268807 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -55,56 +42,38 @@ object InMemoryRelation { private[columnar] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) -case class InMemoryRelation( -output: Seq[Attribute], +case class CachedRDDBuilder( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, @transient child: SparkPlan, tableName: Option[String])( -@transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator, -statsOfPlanToCache: Statistics, -override val outputOrdering: Seq[SortOrder]) - extends logical.LeafNode with MultiInstanceRelation { - - override protected def innerChildren: Seq[SparkPlan] = Seq(child) - - override def doCanonicalize(): logical.LogicalPlan = -copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), - storageLevel = StorageLevel.NONE, - child = child.canonicalized, - tableName = None)( - _cachedColumnBuffers, - sizeInBytesStats, - statsOfPlanToCache, - outputOrdering) - - override def producedAttributes: AttributeSet = outputSet +@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null) { - @transient val partitionStatistics = new PartitionStatistics(output) + val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator - override def computeStats(): Statistics = { -if (sizeInBytesStats.value == 0L) { - // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache. - // Note that we should drop the hint info here. We may cache a plan whose root node is a hint - // node. When we lookup the cache with a semantically same plan without hint info, the plan - // returned by cache lookup should not have hint info. If we lookup the cache with a - // semantically same plan with a different hint info, `CacheManager.useCachedData` will take - // care of it and retain the hint info in the lookup input plan. - statsOfPlanToCache.copy(hints = HintInfo()) -} else { - Statistics(sizeInBytes = sizeInBytesStats.value.longValue) + def cachedColumnBuffers: RDD[CachedBatch] = { +if (_cachedColumnBuffers == null) { + synchronized { +if (_cachedColumnBuffers == null) { + _cachedColumnBuffers = buildBuffers() +} + } } +_cachedColumnBuffers } - // If the cached column buffers were not passed in, we calculate them in the constructor. - // As in Spark, the actual work of caching is lazy. - if (_cachedColumnBuffers == null) { -buildBuffers() + def clearCache(blocking: Boolean = true): Unit = { +if (_cachedColumnBuffers != null) { + synchronized { +if (_cachedColumnBuffers != null) { + _cachedColumnBuffers.unpersist(blocking) --- End diff -- shall we also do `_cachedColumnBuffers = null` so that `unpersist` won't be called twice? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...
Github user ConcurrencyPractitioner commented on the issue: https://github.com/apache/spark/pull/21124 +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][SPARK-23879][CORE][SQL] Introduce multiple...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19222 Instead of round-robin the memory block type across iterations, can we just operate on all the memory blocks in each iteration? Then we can remove the `if-else` and make the benchmark focus more on the memory block? As a comparison, we can create a byte array, a long array, an offheap array, and also operate on them in each iteration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21111: [SPARK-23877][SQL][followup] use PhysicalOperation to si...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/2 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2570/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21111: [SPARK-23877][SQL][followup] use PhysicalOperation to si...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/2 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21111: [SPARK-23877][SQL][followup] use PhysicalOperation to si...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/2 **[Test build #89695 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89695/testReport)** for PR 2 at commit [`d624955`](https://github.com/apache/spark/commit/d624955aa7fd07acde698a50d05ed5679ee91533). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21117: [followup][SPARK-10399][SPARK-23879][Core] Free u...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21117 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21111: [SPARK-23877][SQL][followup] use PhysicalOperatio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/2#discussion_r183266741 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala --- @@ -114,11 +119,8 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic relation match { case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) => val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) -val partitionData = fsRelation.location.listFiles(relFilters, Nil) -// partition data may be a stream, which can cause serialization to hit stack level too -// deep exceptions because it is a recursive structure in memory. converting to array -// avoids the problem. --- End diff -- > Would it be reasonable for a future commit to remove the @transient modifier and re-introduce the problem? That's very unlikely. SPARK-21884 guarantees Spark won't serialize the rows and we have regression tests to protect us. BTW it would be a lot of work to make sure all the places that create `LocalRelation` do not use recursive structure. I'll add some comments to `LocalRelation` to emphasize it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21117: [followup][SPARK-10399][SPARK-23879][Core] Free unused o...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21117 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21125: [Spark-24024] Fix poisson deviance calculations in GLM t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21125 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21125: [Spark-24024] Fix poisson deviance calculations in GLM t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21125 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14083: [SPARK-16406][SQL] Improve performance of LogicalPlan.re...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/14083 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14083: [SPARK-16406][SQL] Improve performance of Logical...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14083#discussion_r183265525 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala --- @@ -138,6 +140,88 @@ package object expressions { def indexOf(exprId: ExprId): Int = { Option(exprIdToOrdinal.get(exprId)).getOrElse(-1) } + +private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = { + m.mapValues(_.distinct).map(identity) +} + +/** Map to use for direct case insensitive attribute lookups. */ +@transient private lazy val direct: Map[String, Seq[Attribute]] = { + unique(attrs.groupBy(_.name.toLowerCase)) +} + +/** Map to use for qualified case insensitive attribute lookups. */ +@transient private val qualified: Map[(String, String), Seq[Attribute]] = { + val grouped = attrs.filter(_.qualifier.isDefined).groupBy { a => +(a.qualifier.get.toLowerCase, a.name.toLowerCase) + } + unique(grouped) +} + +/** Perform attribute resolution given a name and a resolver. */ +def resolve(nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = { + // Collect matching attributes given a name and a lookup. + def collectMatches(name: String, candidates: Option[Seq[Attribute]]): Seq[Attribute] = { +candidates.toSeq.flatMap(_.collect { + case a if resolver(a.name, name) => a.withName(name) +}) + } + + // Find matches for the given name assuming that the 1st part is a qualifier (i.e. table name, + // alias, or subquery alias) and the 2nd part is the actual name. This returns a tuple of + // matched attributes and a list of parts that are to be resolved. + // + // For example, consider an example where "a" is the table name, "b" is the column name, + // and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b", + // and the second element will be List("c"). + val matches = nameParts match { +case qualifier +: name +: nestedFields => + val key = (qualifier.toLowerCase, name.toLowerCase) + val attributes = collectMatches(name, qualified.get(key)).filter { a => +resolver(qualifier, a.qualifier.get) + } + (attributes, nestedFields) +case all => + (Nil, all) + } + + // If none of attributes match `table.column` pattern, we try to resolve it as a column. + val (candidates, nestedFields) = matches match { +case (Seq(), _) => + val name = nameParts.head + val attributes = collectMatches(name, direct.get(name.toLowerCase)) + (attributes, nameParts.tail) +case _ => matches + } + + def name = UnresolvedAttribute(nameParts).name + candidates match { +case Seq(a) if nestedFields.nonEmpty => + // One match, but we also need to extract the requested nested field. + // The foldLeft adds ExtractValues for every remaining parts of the identifier, + // and aliased it with the last part of the name. + // For example, consider "a.b.c", where "a" is resolved to an existing attribute. + // Then this will add ExtractValue("c", ExtractValue("b", a)), and alias the final + // expression as "c". + val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, name) => +ExtractValue(e, Literal(name), resolver) + } + Some(Alias(fieldExprs, nestedFields.last)()) + +case Seq(a) => + // One match, no nested fields, use it. + Some(a) + +case Seq() => + // No matches. + None + +case ambiguousReferences => + // More than one match. + val referenceNames = ambiguousReferences.mkString(", ") --- End diff -- to pass the test, we should follow the previous code: `ambiguousReferences.map(_._1.qualifiedName).mkString(", ")` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21125: [Spark-24024] Fix poisson deviance calculations i...
GitHub user tengpeng opened a pull request: https://github.com/apache/spark/pull/21125 [Spark-24024] Fix poisson deviance calculations in GLM to handle y = 0 ## What changes were proposed in this pull request? It is reported by Spark users that the deviance calculations does not handle y = 0. Thus, the correct model summary cannot be obtained. The user has confirmed the the issue is in override def deviance(y: Double, mu: Double, weight: Double): Double = { 2.0 * weight * (y * math.log(y / mu) - (y - mu)) } when y = 0. The user also mentioned there are many other places he believe we should check the same thing. However, no other changes are needed, including Gamma distribution. ## How was this patch tested? Add a comparison with R deviance calculation to the existing unit test, You can merge this pull request into a Git repository by running: $ git pull https://github.com/tengpeng/spark Spark24024GLM Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21125.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21125 commit 3c6a4dab973851e385b6c9a2c77e5684ad6171a4 Author: Teng PengDate: 2018-04-23T02:31:25Z fix deviance calculation when y = 0 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14083: [SPARK-16406][SQL] Improve performance of Logical...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14083#discussion_r183265417 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala --- @@ -138,6 +140,88 @@ package object expressions { def indexOf(exprId: ExprId): Int = { Option(exprIdToOrdinal.get(exprId)).getOrElse(-1) } + +private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = { + m.mapValues(_.distinct).map(identity) +} + +/** Map to use for direct case insensitive attribute lookups. */ +@transient private lazy val direct: Map[String, Seq[Attribute]] = { + unique(attrs.groupBy(_.name.toLowerCase)) +} + +/** Map to use for qualified case insensitive attribute lookups. */ +@transient private val qualified: Map[(String, String), Seq[Attribute]] = { + val grouped = attrs.filter(_.qualifier.isDefined).groupBy { a => +(a.qualifier.get.toLowerCase, a.name.toLowerCase) + } + unique(grouped) +} + +/** Perform attribute resolution given a name and a resolver. */ +def resolve(nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = { + // Collect matching attributes given a name and a lookup. + def collectMatches(name: String, candidates: Option[Seq[Attribute]]): Seq[Attribute] = { +candidates.toSeq.flatMap(_.collect { + case a if resolver(a.name, name) => a.withName(name) +}) + } + + // Find matches for the given name assuming that the 1st part is a qualifier (i.e. table name, + // alias, or subquery alias) and the 2nd part is the actual name. This returns a tuple of + // matched attributes and a list of parts that are to be resolved. + // + // For example, consider an example where "a" is the table name, "b" is the column name, + // and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b", + // and the second element will be List("c"). + val matches = nameParts match { +case qualifier +: name +: nestedFields => + val key = (qualifier.toLowerCase, name.toLowerCase) + val attributes = collectMatches(name, qualified.get(key)).filter { a => +resolver(qualifier, a.qualifier.get) + } + (attributes, nestedFields) +case all => + (Nil, all) + } + + // If none of attributes match `table.column` pattern, we try to resolve it as a column. + val (candidates, nestedFields) = matches match { +case (Seq(), _) => + val name = nameParts.head + val attributes = collectMatches(name, direct.get(name.toLowerCase)) + (attributes, nameParts.tail) +case _ => matches + } + + def name = UnresolvedAttribute(nameParts).name + candidates match { +case Seq(a) if nestedFields.nonEmpty => + // One match, but we also need to extract the requested nested field. + // The foldLeft adds ExtractValues for every remaining parts of the identifier, + // and aliased it with the last part of the name. + // For example, consider "a.b.c", where "a" is resolved to an existing attribute. + // Then this will add ExtractValue("c", ExtractValue("b", a)), and alias the final + // expression as "c". + val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, name) => +ExtractValue(e, Literal(name), resolver) --- End diff -- `ExtractValue` has the same perf problem, but this can be fixed in a follow up --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21082 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89693/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21082 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21082 **[Test build #89693 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89693/testReport)** for PR 21082 at commit [`27158d9`](https://github.com/apache/spark/commit/27158d9873d54de9312db9e2db5c88d430589ade). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...
Github user tedyu commented on the issue: https://github.com/apache/spark/pull/21124 +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21124 @brkyvz PTAL. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: SPARK-23004
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21124 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2569/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: SPARK-23004
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21124 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21124: SPARK-23004
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21124 **[Test build #89694 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89694/testReport)** for PR 21124 at commit [`304498e`](https://github.com/apache/spark/commit/304498eaf3ea0bd8a52a150257dc8b38a11c4108). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21124: SPARK-23004
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/21124 SPARK-23004 ## What changes were proposed in this pull request? A structured streaming query with a streaming aggregation can throw the following error in rare cases. ``` java.lang.IllegalStateException: Cannot remove after already committed or aborted at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659 ) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80) at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111) at org.apache.spark.sql.execution.aggregate.ObjectHashAgg regateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at ...  ``` This can happen when the following conditions are accidentally hit. # Streaming aggregation with aggregation function that is a subset of [`TypedImperativeAggregation`](https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473) (for example, `collect_set`, `collect_list`, `percentile`, etc.). # Query running in `update}` mode # After the shuffle, a partition has exactly 128 records. This happens because of the following. 1. The `StateStoreSaveExec` used in streaming aggregations has the [following logic](https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359) when used in `update` mode. - There is an iterator that reads data from its parent iterator and updates the StateStore. - When the parent iterator is fully consumed (i.e. `baseIterator.hasNext` returns false) then all state changes are committed by calling {{StateStore.commit}}. - The implementation of `StateStore.commit()` in `HDFSBackedStateStore` does not allow itself to be called twice. However, the logic is such that, if `hasNext` is called multiple times after `baseIterator.hasNext` has returned false then each time it will call `StateStore.commit`. - For most aggregation functions, this is okay because `hasNext` is only called once. But thats not the case with `ImperativeTypedAggregates`. 1. `ImperativeTypedAggregates` are executed using `ObjectHashAggregateExec` which will try to use two kinds of hashmaps for aggregations. - It will first try to use an unsorted hashmap. If the size of the hashmap increases beyond a certain threshold (default 128), then it will switch to using a sorted hashmap. - The [switching logic](https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala) in `ObjectAggregationIterator` (used by `ObjectHashAggregateExec`)  is such that when the number of records matches the threshold (i.e. 128), it will end up calling the `iterator.hasNext` twice. When combined with the above two conditions are combined, it leads to the above error. This latent bug has existed since Spark 2.1 when `ObjectHashAggregateExec` was introduced in Spark. The solution is to use `NextIterator` or `CompletionIterator`, each of which has a flag to prevent the "onCompletion" tasks from being called more than once. In this PR, I chose to implement using `NextIterator`. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-23004 Alternatively you can review and apply these changes as the patch at:
[GitHub] spark issue #6300: [SPARK-7127] [MLLIB] Adding broadcast of model before pre...
Github user Tamilselvan-Veeramani commented on the issue: https://github.com/apache/spark/pull/6300 can any one help me on, how to use transformImpl method for predictProbability method ? I see it's not implemented in transformImpl of RandomForestClassificationModel. hence my streaming job broad casting the RF model for every mini batch. Help me with way to implement. thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183257701 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { +var filteredDataset = dataset.na.drop(inputColNames.filter( + dataset.schema.fieldNames.contains(_))) --- End diff -- add one empty line for readbility --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183255152 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +161,53 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() +val zeroState = Array.fill(inputCols.length)(new OpenHashMap[String, Long]()) + +// Counts by the string values in the dataset. +val countByValueArray = dataset.na.drop(inputCols) + .select(inputCols.map(col(_).cast(StringType)): _*) + .rdd.treeAggregate(zeroState)( +(state: Array[OpenHashMap[String, Long]], row: Row) => { + for (i <- 0 until inputCols.length) { +state(i).changeValue(row.getString(i), 1L, _ + 1) + } + state +}, +(state1: Array[OpenHashMap[String, Long]], state2: Array[OpenHashMap[String, Long]]) => { + for (i <- 0 until inputCols.length) { +state2(i).foreach { case (key: String, count: Long) => + state1(i).changeValue(key, count, _ + count) +} + } + state1 +} + ) + +// In case of equal frequency when frequencyDesc/Asc, we further sort the strings by alphabet. +val labelsArray = countByValueArray.map { countByValue => + $(stringOrderType) match { +case StringIndexer.frequencyDesc => + countByValue.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray +case StringIndexer.frequencyAsc => + countByValue.toSeq.sortBy(_._1).sortBy(_._2).map(_._1).toArray +case StringIndexer.alphabetDesc => countByValue.toSeq.map(_._1).sortWith(_ > _).toArray --- End diff -- I think we can break the code into two paths. One is sorting by frequency which requires to compute the counts, and the other is sorting by alphabet which only requires distinct. We could move the `countByValueArray` code into labelsArray. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183253488 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params with HasHandleInvalid wi @Since("2.3.0") def getStringOrderType: String = $(stringOrderType) - /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType): StructType = { -val inputColName = $(inputCol) + /** Returns the input and output column names corresponding in pair. */ + private[feature] def getInOutCols(): (Array[String], Array[String]) = { +ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) + +if (isSet(inputCol)) { --- End diff -- If both `inputCol` and `inputCols` are set, throw an exception. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r18325 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { +var filteredDataset = dataset.na.drop(inputColNames.filter( + dataset.schema.fieldNames.contains(_))) +for (i <- 0 until inputColNames.length) { + val inputColName = inputColNames(i) + val labelToIndex = labelsToIndexArray(i) + val filterer = udf { label: String => +labelToIndex.contains(label) --- End diff -- Isn't this very expansive? We basically look up `labelToIndex` twice. Will be cool that we support `lit(Map())` so we can do those lookup natively in SQL, and also `na.drop` together in wholestage codegen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183253932 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params with HasHandleInvalid wi @Since("2.3.0") def getStringOrderType: String = $(stringOrderType) - /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType): StructType = { -val inputColName = $(inputCol) + /** Returns the input and output column names corresponding in pair. */ + private[feature] def getInOutCols(): (Array[String], Array[String]) = { +ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) + +if (isSet(inputCol)) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, +"The number of input columns does not match output columns") + ($(inputCols), $(outputCols)) +} + } + + private def validateAndTransformField( + schema: StructType, + inputColName: String, + outputColName: String): StructField = { val inputDataType = schema(inputColName).dataType require(inputDataType == StringType || inputDataType.isInstanceOf[NumericType], s"The input column $inputColName must be either string type or numeric type, " + s"but got $inputDataType.") -val inputFields = schema.fields -val outputColName = $(outputCol) -require(inputFields.forall(_.name != outputColName), +require(schema.fields.forall(_.name != outputColName), s"Output column $outputColName already exists.") -val attr = NominalAttribute.defaultAttr.withName($(outputCol)) -val outputFields = inputFields :+ attr.toStructField() -StructType(outputFields) +NominalAttribute.defaultAttr.withName($(outputCol)).toStructField() + } + + /** Validates and transforms the input schema. */ + protected def validateAndTransformSchema( + schema: StructType, + skipNonExistsCol: Boolean = false): StructType = { +val (inputColNames, outputColNames) = getInOutCols() + +val outputFields = for (i <- 0 until inputColNames.length) yield { + if (schema.fieldNames.contains(inputColNames(i))) { +validateAndTransformField(schema, inputColNames(i), outputColNames(i)) + } else { +if (skipNonExistsCol) { + null +} else { + throw new SparkException(s"Input column ${inputColNames(i)} does not exist.") +} + } +} +StructType(schema.fields ++ outputFields.filter(_ != null)) --- End diff -- Then you don't need to filter with the above code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183257676 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { --- End diff -- Please add some comment about what is the invalid data. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183254078 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +161,53 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() +val zeroState = Array.fill(inputCols.length)(new OpenHashMap[String, Long]()) + +// Counts by the string values in the dataset. +val countByValueArray = dataset.na.drop(inputCols) + .select(inputCols.map(col(_).cast(StringType)): _*) + .rdd.treeAggregate(zeroState)( --- End diff -- Possible to aggregate natively with SQL? I don't think we will compromise the performance with SQL aggregation like `groupBy` and `agg` and `countDistinct` without using tree aggregation since the states will be very small in this use-case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183258353 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { +var filteredDataset = dataset.na.drop(inputColNames.filter( + dataset.schema.fieldNames.contains(_))) +for (i <- 0 until inputColNames.length) { + val inputColName = inputColNames(i) + val labelToIndex = labelsToIndexArray(i) + val filterer = udf { label: String => +labelToIndex.contains(label) + } + filteredDataset = filteredDataset.where(filterer(dataset(inputColName))) } +filteredDataset + } -val metadata = NominalAttribute.defaultAttr - .withName($(outputCol)).withValues(filteredLabels).toMetadata() -// If we are skipping invalid records, filter them out. -val (filteredDataset, keepInvalid) = $(handleInvalid) match { - case StringIndexer.SKIP_INVALID => -val filterer = udf { label: String => - labelToIndex.contains(label) -} - (dataset.na.drop(Array($(inputCol))).where(filterer(dataset($(inputCol, false) - case _ => (dataset, getHandleInvalid == StringIndexer.KEEP_INVALID) -} + private def getIndexer(labels: Seq[String], labelToIndex: OpenHashMap[String, Double]) = { +val keepInvalid = (getHandleInvalid == StringIndexer.KEEP_INVALID) -val indexer = udf { label: String => +udf { label: String => --- End diff -- This requires calling many udf for different input columns. Should we combine then in one udf? The `filteredDataset` logic can be in as well to avoid multiple lookups. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183257799 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -217,33 +295,32 @@ class StringIndexerModel ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { -if (!dataset.schema.fieldNames.contains($(inputCol))) { - logInfo(s"Input column ${$(inputCol)} does not exist during transformation. " + -"Skip StringIndexerModel.") - return dataset.toDF -} -transformSchema(dataset.schema, logging = true) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) -val filteredLabels = getHandleInvalid match { - case StringIndexer.KEEP_INVALID => labels :+ "__unknown" - case _ => labels + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def filterInvalidData(dataset: Dataset[_], inputColNames: Seq[String]): Dataset[_] = { +var filteredDataset = dataset.na.drop(inputColNames.filter( + dataset.schema.fieldNames.contains(_))) +for (i <- 0 until inputColNames.length) { + val inputColName = inputColNames(i) + val labelToIndex = labelsToIndexArray(i) + val filterer = udf { label: String => +labelToIndex.contains(label) + } + filteredDataset = filteredDataset.where(filterer(dataset(inputColName))) --- End diff -- is it possible to not use `var filteredDataset`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r183253904 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -79,26 +80,56 @@ private[feature] trait StringIndexerBase extends Params with HasHandleInvalid wi @Since("2.3.0") def getStringOrderType: String = $(stringOrderType) - /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType): StructType = { -val inputColName = $(inputCol) + /** Returns the input and output column names corresponding in pair. */ + private[feature] def getInOutCols(): (Array[String], Array[String]) = { +ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) + +if (isSet(inputCol)) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, +"The number of input columns does not match output columns") + ($(inputCols), $(outputCols)) +} + } + + private def validateAndTransformField( + schema: StructType, + inputColName: String, + outputColName: String): StructField = { val inputDataType = schema(inputColName).dataType require(inputDataType == StringType || inputDataType.isInstanceOf[NumericType], s"The input column $inputColName must be either string type or numeric type, " + s"but got $inputDataType.") -val inputFields = schema.fields -val outputColName = $(outputCol) -require(inputFields.forall(_.name != outputColName), +require(schema.fields.forall(_.name != outputColName), s"Output column $outputColName already exists.") -val attr = NominalAttribute.defaultAttr.withName($(outputCol)) -val outputFields = inputFields :+ attr.toStructField() -StructType(outputFields) +NominalAttribute.defaultAttr.withName($(outputCol)).toStructField() + } + + /** Validates and transforms the input schema. */ + protected def validateAndTransformSchema( + schema: StructType, + skipNonExistsCol: Boolean = false): StructType = { +val (inputColNames, outputColNames) = getInOutCols() + +val outputFields = for (i <- 0 until inputColNames.length) yield { --- End diff -- Nit, why not the following for readability? ```scala val outputFields = inputColNames.zip(outputColNames).flatMap { case (inputColName, outputColName) => schema.fieldNames.contains(inputColName) match { case true => validateAndTransformField(schema, inputColName, outputColName) case false if skipNonExistsCol => None case throw new SparkException(s"Input column $inputColName does not exist." } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21056 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89692/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21056 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21056 **[Test build #89692 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89692/testReport)** for PR 21056 at commit [`1b86df3`](https://github.com/apache/spark/commit/1b86df3293612ef1db80220c8d8e71a4b917a5c7). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20937 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20937 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89691/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20937 **[Test build #89691 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89691/testReport)** for PR 20937 at commit [`482b799`](https://github.com/apache/spark/commit/482b79969b9e0cc475e63b415051b32423facef4). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21071: [SPARK-21962][CORE] Distributed Tracing in Spark
Github user devaraj-kavali closed the pull request at: https://github.com/apache/spark/pull/21071 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183253723 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", + examples = """ +Examples: + > SELECT _FUNC_(array("d", "a", null, "b")); + [("d",0),("a",1),(null,2),("b",3)] + > SELECT _FUNC_(array("d", "a", null, "b"), true); + [(0,"d"),(1,"a"),(2,null),(3,"b")] + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ZipWithIndex(child: Expression, indexFirst: Expression) + extends UnaryExpression with ExpectsInputTypes { + + def this(e: Expression) = this(e, Literal.FalseLiteral) + + private val idxFirst: Boolean = indexFirst match { +case Literal(v: Boolean, BooleanType) => v +case _ => throw new AnalysisException("The second argument has to be a boolean constant.") + } + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + + lazy val childArrayType: ArrayType = child.dataType.asInstanceOf[ArrayType] + + override def dataType: DataType = { +val elementField = StructField("value", childArrayType.elementType, childArrayType.containsNull) +val indexField = StructField("index", IntegerType, false) + +val fields = if (idxFirst) Seq(indexField, elementField) else Seq(elementField, indexField) + +ArrayType(StructType(fields), false) + } + + override protected def nullSafeEval(input: Any): Any = { +val array = input.asInstanceOf[ArrayData].toObjectArray(childArrayType.elementType) + +val makeStruct = (v: Any, i: Int) => if (idxFirst) InternalRow(i, v) else InternalRow(v, i) +val resultData = array.zipWithIndex.map{case (v, i) => makeStruct(v, i)} + +new GenericArrayData(resultData) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => { + if (CodeGenerator.isPrimitiveType(childArrayType.elementType)) { +genCodeForPrimitiveElements(ctx, c, ev.value) + } else { +genCodeForNonPrimitiveElements(ctx, c, ev.value) + } +}) + } + + private def genCodeForPrimitiveElements( + ctx: CodegenContext, + childVariableName: String, + arrayData: String): String = { +val numElements = ctx.freshName("numElements") +val byteArraySize = ctx.freshName("byteArraySize") +val data = ctx.freshName("byteArray") +val unsafeRow = ctx.freshName("unsafeRow") +val structSize = ctx.freshName("structSize") +val unsafeArrayData = ctx.freshName("unsafeArrayData") +val structsOffset = ctx.freshName("structsOffset") +val calculateArraySize = "UnsafeArrayData.calculateSizeOfUnderlyingByteArray" +val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes" + +val baseOffset = Platform.BYTE_ARRAY_OFFSET +val longSize = LongType.defaultSize +val primitiveValueTypeName = CodeGenerator.primitiveTypeName(childArrayType.elementType) +val (valuePosition, indexPosition) = if (idxFirst) ("1", "0") else ("0", "1") + +s""" + |final int $numElements = $childVariableName.numElements(); + |final int $structSize = ${UnsafeRow.calculateBitSetWidthInBytes(2) + longSize * 2}; + |final long $byteArraySize = $calculateArraySize($numElements, $longSize + $structSize); + |final int $structsOffset = $calculateHeader($numElements) + $numElements * $longSize; + |if ($byteArraySize > $MAX_ARRAY_LENGTH) { --- End diff -- I like your suggestion. So instead of throwing the exception, the function will execute a similar piece of code as in `genCodeForNonPrimitiveElements`... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183253226 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(array[, indexFirst]) - Transforms the input array by encapsulating elements into pairs with indexes indicating the order.", --- End diff -- That's really good question! The newly added functions `element_at` and `array_position` are 1-based. But on the other handed, the `getItem` from the `Column` class is 0-based. What about adding one extra parameter and let users decide whether the array will indexed from 0 or 1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21121#discussion_r183252854 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -883,3 +884,140 @@ case class Concat(children: Seq[Expression]) extends Expression { override def sql: String = s"concat(${children.map(_.sql).mkString(", ")})" } + +/** + * Returns the maximum value in the array. --- End diff -- Good spot. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21082 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2568/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21082 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21082: [SPARK-22239][SQL][Python] Enable grouped aggregate pand...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21082 **[Test build #89693 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89693/testReport)** for PR 21082 at commit [`27158d9`](https://github.com/apache/spark/commit/27158d9873d54de9312db9e2db5c88d430589ade). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21056 **[Test build #89692 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89692/testReport)** for PR 21056 at commit [`1b86df3`](https://github.com/apache/spark/commit/1b86df3293612ef1db80220c8d8e71a4b917a5c7). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20937 **[Test build #89691 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89691/testReport)** for PR 20937 at commit [`482b799`](https://github.com/apache/spark/commit/482b79969b9e0cc475e63b415051b32423facef4). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21071: [SPARK-21962][CORE] Distributed Tracing in Spark
Github user rxin commented on the issue: https://github.com/apache/spark/pull/21071 @devaraj-kavali can you close this PR first? Looks like there isn't any reason to really use htrace anymore ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20940: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user edwinalu commented on the issue: https://github.com/apache/spark/pull/20940 Could a committer please request a retest? It looks like the tests passed (https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89685/testReport/), and the failure occurs after posting to github: Attempting to post to Github... [error] running /home/jenkins/workspace/SparkPullRequestBuilder/build/sbt -Phadoop-2.6 -Pkubernetes -Pflume -Phive-thriftserver -Pyarn -Pkafka-0-8 -Phive -Pkinesis-asl -Pmesos test ; process was terminated by signal 9 > Post successful. Build step 'Execute shell' marked build as failure Archiving artifacts Recording test results Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89685/ Test FAILed. Finished: FAILURE --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20933 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89690/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20933 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20933 **[Test build #89690 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89690/testReport)** for PR 20933 at commit [`359f846`](https://github.com/apache/spark/commit/359f846112ba8c7ee9023b7754da4a907068b39b). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends Rule[LogicalPlan] ` * `abstract class FileDataSourceV2 extends DataSourceV2 ` * `class OrcDataSourceV2 extends FileDataSourceV2 with ReadSupport with ReadSupportWithSchema ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21031: [SPARK-23923][SQL] Add cardinality function
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21031 Sure, done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21123: [SPARK-24045][SQL]Create base class for file data source...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21123 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21123: [SPARK-24045][SQL]Create base class for file data source...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21123 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89689/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21123: [SPARK-24045][SQL]Create base class for file data source...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21123 **[Test build #89689 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89689/testReport)** for PR 21123 at commit [`95628e5`](https://github.com/apache/spark/commit/95628e5a027d029be7dcc4e8e979555bc5e0e4a3). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class FallBackFileDataSourceToV1(sparkSession: SparkSession) extends Rule[LogicalPlan] ` * `trait FileDataSourceV2 extends DataSourceV2 with DataSourceRegister ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20933 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20933 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2567/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org