[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23272#discussion_r240204508 --- Diff: core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java --- @@ -667,4 +669,54 @@ public void testPeakMemoryUsed() { } } + @Test + public void avoidDeadlock() throws InterruptedException { +memoryManager.limit(PAGE_SIZE_BYTES); +MemoryMode mode = useOffHeapMemoryAllocator() ? MemoryMode.OFF_HEAP: MemoryMode.ON_HEAP; +TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager, mode); +BytesToBytesMap map = + new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024); + +Runnable memoryConsumer = new Runnable() { + @Override + public void run() { +int i = 0; +long used = 0; +while (i < 10) { + c1.use(1000); + used += 1000; + i++; +} +c1.free(used); + } +}; + +Thread thread = new Thread(memoryConsumer); + +try { + int i; + for (i = 0; i < 1024; i++) { +final long[] arr = new long[]{i}; +final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8); +loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8); + } + + // Starts to require memory at another memory consumer. + thread.start(); + + BytesToBytesMap.MapIterator iter = map.destructiveIterator(); + for (i = 0; i < 1024; i++) { +iter.next(); + } + assertFalse(iter.hasNext()); +} finally { + map.free(); + thread.join(); --- End diff -- This line just makes sure `memoryConsumer` to end and free memory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23272#discussion_r240204245 --- Diff: core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java --- @@ -667,4 +669,54 @@ public void testPeakMemoryUsed() { } } + @Test + public void avoidDeadlock() throws InterruptedException { +memoryManager.limit(PAGE_SIZE_BYTES); +MemoryMode mode = useOffHeapMemoryAllocator() ? MemoryMode.OFF_HEAP: MemoryMode.ON_HEAP; +TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager, mode); +BytesToBytesMap map = + new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024); + +Runnable memoryConsumer = new Runnable() { + @Override + public void run() { +int i = 0; +long used = 0; +while (i < 10) { + c1.use(1000); + used += 1000; + i++; +} +c1.free(used); + } +}; + +Thread thread = new Thread(memoryConsumer); + +try { + int i; + for (i = 0; i < 1024; i++) { +final long[] arr = new long[]{i}; +final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8); +loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8); + } + + // Starts to require memory at another memory consumer. + thread.start(); + + BytesToBytesMap.MapIterator iter = map.destructiveIterator(); + for (i = 0; i < 1024; i++) { +iter.next(); + } + assertFalse(iter.hasNext()); +} finally { + map.free(); + thread.join(); --- End diff -- When without this line, the test still hangs. The test thread hangs on the deadlock with the other thread of running `memoryConsumer`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23272#discussion_r240187707 --- Diff: core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java --- @@ -38,12 +38,14 @@ public long spill(long size, MemoryConsumer trigger) throws IOException { return used; } - void use(long size) { + @VisibleForTesting --- End diff -- Ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23272#discussion_r240187678 --- Diff: core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java --- @@ -667,4 +668,53 @@ public void testPeakMemoryUsed() { } } + @Test + public void avoidDeadlock() throws InterruptedException { --- End diff -- I've tried few ways to set a timeout logic, but don't work. The deadlock hangs the test thread. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesMap.MapI...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23272 > have you seen any bug report caused by this dead lock? The original reporter of the JIRA ticket SPARK-26265 has hit with this bug in their workload. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23272#discussion_r240170574 --- Diff: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java --- @@ -255,11 +255,18 @@ private MapIterator(int numRecords, Location loc, boolean destructive) { } private void advanceToNextPage() { + // SPARK-26265: We will first lock this `MapIterator` and then `TaskMemoryManager` when going + // to free a memory page by calling `freePage`. At the same time, it is possibly that another + // memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it + // acquires memory and causes spilling on this `MapIterator`. To avoid deadlock here, we keep + // reference to the page to free and free it after releasing the lock of `MapIterator`. --- End diff -- Yea, OffHead also suffers from this issue. One change is needed for this test case to cover it. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23269: [SPARK-26316] Currently the wrong implementation in the ...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23269 hmm, I think the PR title is too long...Maybe just `Revert hash join metrics that causes performance degradation`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesMap.MapI...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23272 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/23272 [SPARK-26265][Core] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager ## What changes were proposed in this pull request? In `BytesToBytesMap.MapIterator.advanceToNextPage`, We will first lock this `MapIterator` and then `TaskMemoryManager` when going to free a memory page by calling `freePage`. At the same time, it is possibly that another memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it acquires memory and causes spilling on this `MapIterator`. So it ends with the `MapIterator` object holds lock to the `MapIterator` object and waits for lock on `TaskMemoryManager`, and the other consumer holds lock to `TaskMemoryManager` and waits for lock on the `MapIterator` object. To avoid deadlock here, this patch proposes to keep reference to the page to free and free it after releasing the lock of `MapIterator`. ## How was this patch tested? Added test and manually test by running the test 100 times to make sure there is no deadlock. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 SPARK-26265 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23272.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23272 commit 25e8e068047b714f27706399e1e6c03c338ac178 Author: Liang-Chi Hsieh Date: 2018-12-10T07:59:09Z Fix deadlock in BytesToBytesMap. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r240041688 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { expressions.flatMap(collectEvaluableUDFs) } - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case plan: LogicalPlan => extract(plan) + def apply(plan: LogicalPlan): LogicalPlan = plan match { +// SPARK-26293: A subquery will be rewritten into join later, and will go through this rule +// eventually. Here we skip subquery, as Python UDF only needs to be extracted once. +case _: Subquery => plan --- End diff -- I'm not sure if it is totally ok to skip `Subquery` for all optimizer rules. For `ExtractPythonUDFs` I think it is ok because `ExtractPythonUDFs` is performed after the rules in `RewriteSubquery`. So we can skip `ExtractPythonUDFs` here and extract Python UDF after the subqueries are rewritten into join. But for the rules which perform before `RewriteSubquery`, if we skip it on `Subquery`, we have no chance to do the rules after the subqueries are rewritten into join. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23263: [SPARK-23674][ML] Adds Spark ML Events
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23263#discussion_r240003952 --- Diff: mllib/src/main/scala/org/apache/spark/ml/Estimator.scala --- @@ -65,7 +65,19 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage { * Fits a model to the input data. */ @Since("2.0.0") - def fit(dataset: Dataset[_]): M + def fit(dataset: Dataset[_]): M = MLEvents.withFitEvent(this, dataset) { +fitImpl(dataset) + } + + /** + * `fit()` handles events and then calls this method. Subclasses should override this + * method to implement the actual fiting a model to the input data. + */ + @Since("3.0.0") + protected def fitImpl(dataset: Dataset[_]): M = { +// Keep this default body for backward compatibility. +throw new UnsupportedOperationException("fitImpl is not implemented.") --- End diff -- For current change, Spark ML developers can still choose to override `fit` instead `fitImpl` so their ML model can work without ML event? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23263: [SPARK-23674][ML] Adds Spark ML Events
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23263#discussion_r240003674 --- Diff: mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml + +import java.io.File + +import scala.collection.mutable +import scala.concurrent.duration._ + +import org.apache.hadoop.fs.Path +import org.mockito.Matchers.{any, eq => meq} +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.concurrent.Eventually +import org.scalatest.mockito.MockitoSugar.mock + +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.util._ +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.sql._ +import org.apache.spark.util.Utils + + +class MLEventsSuite +extends SparkFunSuite +with BeforeAndAfterEach +with DefaultReadWriteTest +with Eventually { + + private var spark: SparkSession = _ + private var sc: SparkContext = _ + private var checkpointDir: String = _ + private var listener: SparkListener = _ + private val dirName: String = "pipeline" + private val events = mutable.ArrayBuffer.empty[MLEvent] + + override def beforeAll(): Unit = { +super.beforeAll() +sc = new SparkContext("local[2]", "SparkListenerSuite") +listener = new SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { +case e: FitStart[_] => events.append(e) +case e: FitEnd[_] => events.append(e) +case e: TransformStart => events.append(e) +case e: TransformEnd => events.append(e) +case e: SaveInstanceStart if e.path.endsWith(dirName) => events.append(e) +case e: SaveInstanceEnd if e.path.endsWith(dirName) => events.append(e) +case _ => + } +} +sc.addSparkListener(listener) + +spark = SparkSession.builder() + .sparkContext(sc) + .getOrCreate() + +checkpointDir = Utils.createDirectory(tempDir.getCanonicalPath, "checkpoints").toString +sc.setCheckpointDir(checkpointDir) --- End diff -- I may miss it, where do we use checkpoint? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23263: [SPARK-23674][ML] Adds Spark ML Events
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23263#discussion_r240003563 --- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala --- @@ -132,7 +132,8 @@ class Pipeline @Since("1.4.0") ( * @return fitted pipeline */ @Since("2.0.0") - override def fit(dataset: Dataset[_]): PipelineModel = { + override def fit(dataset: Dataset[_]): PipelineModel = super.fit(dataset) --- End diff -- Is there any `fit` method which doesn't do `super.fit()`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23253#discussion_r240003434 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -37,6 +37,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. + --- End diff -- Ok. Sounds reasonable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23253#discussion_r239998453 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -37,6 +37,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. + --- End diff -- For such behavior change, shall we add a config to roll back to previous behavior? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23253#discussion_r239998485 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -37,6 +37,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. + --- End diff -- And you should also update other places where defines previous behavior, like DataFrameReader. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/20146 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23259: [SPARK-26215][SQL][WIP] Define reserved/non-reser...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23259#discussion_r239994385 --- Diff: sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 --- @@ -769,7 +774,7 @@ nonReserved | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | RECOVER | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION | LOCAL | INPATH | ASC | DESC | LIMIT | RENAME | SETS -| AT | NULLS | OVERWRITE | ALL | ANY | ALTER | AS | BETWEEN | BY | CREATE | DELETE +| AT | NULLS | OVERWRITE | ANY | ALTER | AS | BETWEEN | BY | CREATE | DELETE --- End diff -- Doesn't `ANY` move to `reserved`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23253#discussion_r239994326 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -37,6 +37,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. + --- End diff -- If there is no corrupt column? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/20146 Thanks @holdenk for reviewing! I've resolved some comments and replied others. Please take a look. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239994064 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def countByValue( + dataset: Dataset[_], + inputCols: Array[String]): Array[OpenHashMap[String, Long]] = { + +val aggregator = new StringIndexerAggregator(inputCols.length) +implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]] + +dataset.select(inputCols.map(col(_).cast(StringType)): _*) + .toDF + .groupBy().agg(aggregator.toColumn) + .as[Array[OpenHashMap[String, Long]]] + .collect()(0) + } + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() + +val filteredDF = dataset.na.drop(inputCols) + +// In case of equal frequency when frequencyDesc/Asc, we further sort the strings by alphabet. +val labelsArray = $(stringOrderType) match { + case StringIndexer.frequencyDesc => +countByValue(filteredDF, inputCols).map { counts => + counts.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray --- End diff -- As one sorts by string and another one sorts by count, can we replace them with a compound expression with a single sortBy? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23258: [SPARK-23375][SQL][FOLLOWUP][TEST] Test Sort metr...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23258#discussion_r239993927 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -182,10 +182,13 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared } test("Sort metrics") { -// Assume the execution plan is -// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1)) -val ds = spark.range(10).sort('id) -testSparkPlanMetrics(ds.toDF(), 2, Map.empty) +// Assume the execution plan with node id is +// Sort(nodeId = 0) +// Exchange(nodeId = 1) +// Range(nodeId = 2) +val df = spark.range(9, -1, -1).sort('id).toDF() --- End diff -- Either is fine to me as we now add assert to make sure Sort node exist. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239993480 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def countByValue( + dataset: Dataset[_], + inputCols: Array[String]): Array[OpenHashMap[String, Long]] = { + +val aggregator = new StringIndexerAggregator(inputCols.length) +implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]] + +dataset.select(inputCols.map(col(_).cast(StringType)): _*) + .toDF + .groupBy().agg(aggregator.toColumn) + .as[Array[OpenHashMap[String, Long]]] + .collect()(0) + } + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() + +val filteredDF = dataset.na.drop(inputCols) + +// In case of equal frequency when frequencyDesc/Asc, we further sort the strings by alphabet. --- End diff -- I'll also add it to ml migration document. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239992942 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -310,11 +439,23 @@ object StringIndexerModel extends MLReadable[StringIndexerModel] { override def load(path: String): StringIndexerModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString - val data = sparkSession.read.parquet(dataPath) -.select("labels") -.head() - val labels = data.getAs[Seq[String]](0).toArray - val model = new StringIndexerModel(metadata.uid, labels) + + val (majorVersion, minorVersion) = majorMinorVersion(metadata.sparkVersion) + val labelsArray = if (majorVersion < 2 || (majorVersion == 2 && minorVersion <= 3)) { --- End diff -- But this needs to change for Spark 3.0 now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239992845 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -310,11 +439,23 @@ object StringIndexerModel extends MLReadable[StringIndexerModel] { override def load(path: String): StringIndexerModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString - val data = sparkSession.read.parquet(dataPath) -.select("labels") -.head() - val labels = data.getAs[Seq[String]](0).toArray - val model = new StringIndexerModel(metadata.uid, labels) + + val (majorVersion, minorVersion) = majorMinorVersion(metadata.sparkVersion) + val labelsArray = if (majorVersion < 2 || (majorVersion == 2 && minorVersion <= 3)) { --- End diff -- This is for loading old StringIndexerModel saved by previous Spark. Previous model has `labels`, but new model has `labelsArray`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239992579 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def countByValue( + dataset: Dataset[_], + inputCols: Array[String]): Array[OpenHashMap[String, Long]] = { + +val aggregator = new StringIndexerAggregator(inputCols.length) +implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]] + +dataset.select(inputCols.map(col(_).cast(StringType)): _*) + .toDF + .groupBy().agg(aggregator.toColumn) + .as[Array[OpenHashMap[String, Long]]] + .collect()(0) + } + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() + +val filteredDF = dataset.na.drop(inputCols) + +// In case of equal frequency when frequencyDesc/Asc, we further sort the strings by alphabet. +val labelsArray = $(stringOrderType) match { + case StringIndexer.frequencyDesc => +countByValue(filteredDF, inputCols).map { counts => + counts.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray +} + case StringIndexer.frequencyAsc => +countByValue(filteredDF, inputCols).map { counts => + counts.toSeq.sortBy(_._1).sortBy(_._2).map(_._1).toArray +} + case StringIndexer.alphabetDesc => +import dataset.sparkSession.implicits._ +inputCols.map { inputCol => + filteredDF.select(inputCol).distinct().sort(dataset(s"$inputCol").desc) --- End diff -- That's good. I miss this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239992360 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -79,26 +81,53 @@ private[feature] trait StringIndexerBase extends Params with HasHandleInvalid wi @Since("2.3.0") def getStringOrderType: String = $(stringOrderType) - /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType): StructType = { -val inputColName = $(inputCol) + /** Returns the input and output column names corresponding in pair. */ + private[feature] def getInOutCols(): (Array[String], Array[String]) = { +ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), Seq(outputCols)) + +if (isSet(inputCol)) { + (Array($(inputCol)), Array($(outputCol))) +} else { + require($(inputCols).length == $(outputCols).length, +"The number of input columns does not match output columns") + ($(inputCols), $(outputCols)) +} + } + + private def validateAndTransformField( + schema: StructType, + inputColName: String, + outputColName: String): StructField = { val inputDataType = schema(inputColName).dataType require(inputDataType == StringType || inputDataType.isInstanceOf[NumericType], s"The input column $inputColName must be either string type or numeric type, " + s"but got $inputDataType.") -val inputFields = schema.fields -val outputColName = $(outputCol) -require(inputFields.forall(_.name != outputColName), +require(schema.fields.forall(_.name != outputColName), --- End diff -- Sounds good to me. Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239992378 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.4.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private def countByValue( + dataset: Dataset[_], + inputCols: Array[String]): Array[OpenHashMap[String, Long]] = { + +val aggregator = new StringIndexerAggregator(inputCols.length) +implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]] + +dataset.select(inputCols.map(col(_).cast(StringType)): _*) + .toDF + .groupBy().agg(aggregator.toColumn) + .as[Array[OpenHashMap[String, Long]]] + .collect()(0) + } + @Since("2.0.0") override def fit(dataset: Dataset[_]): StringIndexerModel = { transformSchema(dataset.schema, logging = true) -val values = dataset.na.drop(Array($(inputCol))) - .select(col($(inputCol)).cast(StringType)) - .rdd.map(_.getString(0)) -val labels = $(stringOrderType) match { - case StringIndexer.frequencyDesc => values.countByValue().toSeq.sortBy(-_._2) -.map(_._1).toArray - case StringIndexer.frequencyAsc => values.countByValue().toSeq.sortBy(_._2) -.map(_._1).toArray - case StringIndexer.alphabetDesc => values.distinct.collect.sortWith(_ > _) - case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ < _) -} -copyValues(new StringIndexerModel(uid, labels).setParent(this)) + +val (inputCols, _) = getInOutCols() + +val filteredDF = dataset.na.drop(inputCols) + +// In case of equal frequency when frequencyDesc/Asc, we further sort the strings by alphabet. --- End diff -- Moved to `stringOrderType`'s doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20146#discussion_r239991238 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") --- End diff -- Yes. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239990986 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -38,13 +38,21 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) + private lazy val writeMetrics = +SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) + private lazy val readMetrics = +SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) --- End diff -- I feel it is better to rename SQLShuffleMetricsReporter to SQLShuffleReadMetricsReporter to make it match with SQLShuffleWriteMetricsReporter. It can be in a followup. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23253#discussion_r239846601 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -37,6 +37,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. + --- End diff -- And this behavior is also defined at some places like DataFrameReader. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23253#discussion_r239846300 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -37,6 +37,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. + --- End diff -- hmm, if returned row contains non null fields, how do we know if the row is read from a bad JSON record or a correct JSON record? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/20146 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23224: [SPARK-26277][SQL][TEST] WholeStageCodegen metric...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23224#discussion_r239837818 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -80,8 +80,10 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared // Assume the execution plan is // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1)) // TODO: update metrics in generated operators -val ds = spark.range(10).filter('id < 5) -testSparkPlanMetrics(ds.toDF(), 1, Map.empty) +val df = spark.range(10).filter('id < 5).toDF() +testSparkPlanMetrics(df, 1, Map.empty, true) + df.queryExecution.executedPlan.find(_.isInstanceOf[WholeStageCodegenExec]) + .getOrElse(assert(false)) --- End diff -- Seems test `Sort metric` also has similar issue: ```scala test("Sort metrics") { // Assume the execution plan is // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1)) val ds = spark.range(10).sort('id) testSparkPlanMetrics(ds.toDF(), 2, Map.empty) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239754619 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -118,10 +115,12 @@ case class HashClusteredDistribution( /** * Represents data where tuples have been ordered according to the `ordering` - * [[Expression Expressions]]. This is a strictly stronger guarantee than - * [[ClusteredDistribution]] as an ordering will ensure that tuples that share the - * same value for the ordering expressions are contiguous and will never be split across - * partitions. + * [[Expression Expressions]]. Its requirement is defined as the following: + * - Given any 2 adjacent partitions, all the rows of the second partition must be larger than or + * equal to any row in the first partition, according to the `ordering` expressions. --- End diff -- Why here we need this equality? Can we just have all the rows in the second partition must be larger than any row in the first partition? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r239744338 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` used to write data when the table exists. + def writingCommandForExistingTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + + // Returns `DataWritingCommand` used to write data when the table doesn't exist. + def writingCommandForNewTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +InsertIntoHiveTable( + tableDesc, + Map.empty, --- End diff -- That is different issue. I think it is better to have a separate PR to address it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/20146 ping @dbtsai --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23239: [SPARK-26021][SQL][followup] only deal with NaN and -0.0...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23239 The migration guide has changed by another followup https://github.com/apache/spark/pull/23141: > In Spark version 2.4 and earlier, float/double -0.0 is semantically equal to 0.0, but users can still distinguish them via `Dataset.show`, `Dataset.collect` etc. Since Spark 3.0, float/double -0.0 is replaced by 0.0 internally, and users can't distinguish them any more. Is above still correct after this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23239: [SPARK-26021][SQL][followup] only deal with NaN a...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23239#discussion_r239690045 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java --- @@ -198,11 +198,45 @@ protected final void writeLong(long offset, long value) { Platform.putLong(getBuffer(), offset, value); } + // We need to take care of NaN and -0.0 in several places: + // 1. When compare values, different NaNs should be treated as same, `-0.0` and `0.0` should be + // treated as same. + // 2. In range partitioner, different NaNs should belong to the same partition, -0.0 and 0.0 --- End diff -- As this is not a problem, we should update the PR description too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r239668492 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` used to write data when the table exists. + def writingCommandForExistingTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + + // Returns `DataWritingCommand` used to write data when the table doesn't exist. + def writingCommandForNewTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +InsertIntoHiveTable( + tableDesc, + Map.empty, --- End diff -- Yes, I've discussed with @cloud-fan. DataFrameWriter allows dynamic partition on Hive CTAS. For now seems this syntax is not allowed by SparkSqlParser. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23215: [SPARK-26263][SQL] Validate partition values with...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23215#discussion_r239460682 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala --- @@ -272,9 +279,14 @@ object PartitioningUtils { val literal = if (userSpecifiedDataTypes.contains(columnName)) { // SPARK-26188: if user provides corresponding column schema, get the column value without // inference, and then cast it as user specified data type. -val columnValue = inferPartitionColumnValue(rawColumnValue, false, timeZone) -val castedValue = - Cast(columnValue, userSpecifiedDataTypes(columnName), Option(timeZone.getID)).eval() +val dataType = userSpecifiedDataTypes(columnName) +val columnValueLiteral = inferPartitionColumnValue(rawColumnValue, false, timeZone) +val columnValue = columnValueLiteral.eval() +val castedValue = Cast(columnValueLiteral, dataType, Option(timeZone.getID)).eval() +if (validatePartitionColumns && columnValue != null && castedValue == null) { + throw new RuntimeException(s"Failed to cast value `$columnValue` to `$dataType` " + +s"for partition column `$columnName`") +} Literal.create(castedValue, userSpecifiedDataTypes(columnName)) --- End diff -- `Literal.create(castedValue, dataType)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/22514 @cloud-fan I've updated the PR description. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23215: [SPARK-26263][SQL] Validate partition values with user p...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23215 Sounds good to me too. As there is a config, it is good that we can still disable it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23215: [SPARK-26263][SQL] Validate partition values with...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23215#discussion_r239388954 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala --- @@ -272,9 +279,13 @@ object PartitioningUtils { val literal = if (userSpecifiedDataTypes.contains(columnName)) { // SPARK-26188: if user provides corresponding column schema, get the column value without // inference, and then cast it as user specified data type. -val columnValue = inferPartitionColumnValue(rawColumnValue, false, timeZone) -val castedValue = - Cast(columnValue, userSpecifiedDataTypes(columnName), Option(timeZone.getID)).eval() +val dataType = userSpecifiedDataTypes(columnName) +val columnValueLiteral = inferPartitionColumnValue(rawColumnValue, false, timeZone) +val columnValue = columnValueLiteral.eval() +val castedValue = Cast(columnValueLiteral, dataType, Option(timeZone.getID)).eval() +if (validatePartitionColumns && columnValue != null && castedValue == null) { + throw new RuntimeException(s"Failed to cast partition value `$columnValue` to $dataType") --- End diff -- Can we also show `columnName` in this exception message? So it is easier to know which partition column has such error. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r239323943 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` used to write data when the table exists. + def writingCommandForExistingTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + + // Returns `DataWritingCommand` used to write data when the table doesn't exist. + def writingCommandForNewTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +InsertIntoHiveTable( + tableDesc, + Map.empty, --- End diff -- https://github.com/apache/spark/blob/8534d753ecb21ea64ffbaefb5eaca38ba0464c6d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala#L686-L697 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23213 I think so, don't know if @cloud-fan or @mgaido91 has other opinions? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23236: [SPARK-26275][PYTHON][ML] Increases timeout for Streamin...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23236 > Compared to the error values from the test failures above, they match up until the 10th batch but then these continue until the 16th where it has a timeout I suspect that might because as the resource usage is heavy, `StreamingLogisticRegressionWithSGD`'s training speed on input batch stream can't always catch up predict batch stream. So the model doesn't reach expected improvement in error yet. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23239: [SPARK-26021][SQL][followup] only deal with NaN a...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23239#discussion_r239302780 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java --- @@ -198,11 +198,45 @@ protected final void writeLong(long offset, long value) { Platform.putLong(getBuffer(), offset, value); } + // We need to take care of NaN and -0.0 in several places: + // 1. When compare values, different NaNs should be treated as same, `-0.0` and `0.0` should be + // treated as same. + // 2. In range partitioner, different NaNs should belong to the same partition, -0.0 and 0.0 --- End diff -- Do we already have related test for case 2? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r239300131 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` used to write data when the table exists. + def writingCommandForExistingTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + + // Returns `DataWritingCommand` used to write data when the table doesn't exist. + def writingCommandForNewTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +InsertIntoHiveTable( + tableDesc, + Map.empty, --- End diff -- I think it is because we don't allow to have a Hive CTAS statement to create a partitioned table. I saw there is one test in hive's SQLQuerySuite for it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23236: [SPARK-26275][PYTHON][ML] Increases timeout for Streamin...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23236 I agree with @BryanCutler's analysis and it looks a bit weird at few things about this test. I also think it is fine to increase the timeout for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as a...
Github user viirya closed the pull request at: https://github.com/apache/spark/pull/23231 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as alias to...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23231 Then let me close this now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as alias to...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23231 Ok. Maybe we can add few words in ml migration guide to clearly announce this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23213 `wholeStage=false, factoryMode=CODE_ONLY` and `wholeStage=false, factoryMode=NO_CODEGEN` should have more complete test coverage for `GenerateUnsafeProject`, `GenerateMutableProject`, etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as alias to...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23231 It is because we have such claim in ml migration guide that said we will keep OneHotEncoderEstimator as an alias. I'm fine if we have consensus now that we can avoid such alias. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23230: [SPARK-26133][ML][Followup] Fix doc for OneHotEncoder
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23230 Thanks @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as a...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23231#discussion_r239011539 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.annotation.Since +import org.apache.spark.ml.Estimator +import org.apache.spark.ml.param._ +import org.apache.spark.ml.util._ +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.types.StructType + +/** + * A one-hot encoder that maps a column of category indices to a column of binary vectors, with + * at most a single one-value per row that indicates the input category index. + * For example with 5 categories, an input value of 2.0 would map to an output vector of + * `[0.0, 0.0, 1.0, 0.0]`. + * The last category is not included by default (configurable via `dropLast`), + * because it makes the vector entries sum up to one, and hence linearly dependent. + * So an input value of 4.0 maps to `[0.0, 0.0, 0.0, 0.0]`. + * + * @note This is different from scikit-learn's OneHotEncoder, which keeps all categories. + * The output vectors are sparse. + * + * When `handleInvalid` is configured to 'keep', an extra "category" indicating invalid values is + * added as last category. So when `dropLast` is true, invalid values are encoded as all-zeros + * vector. + * + * @note When encoding multi-column by using `inputCols` and `outputCols` params, input/output cols + * come in pairs, specified by the order in the arrays, and each pair is treated independently. + * + * @note `OneHotEncoderEstimator` is renamed to `OneHotEncoder` in 3.0.0. This + * `OneHotEncoderEstimator` is kept as an alias and will be removed in further version. + * + * @see `StringIndexer` for converting categorical values into category indices + */ +@Since("2.3.0") --- End diff -- These since tags are from original OneHotEncoderEstimator. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as alias to...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23231 cc @srowen @dbtsai --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as a...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23231#discussion_r239008438 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderEstimatorSuite.scala --- @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.ml.attribute.{AttributeGroup, BinaryAttribute, NominalAttribute} +import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} +import org.apache.spark.ml.param.ParamsSuite +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest} +import org.apache.spark.sql.{Encoder, Row} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types._ + +class OneHotEncoderEstimatorSuite extends MLTest with DefaultReadWriteTest { --- End diff -- The fitting of OneHotEncoderEstimator is actually done by OneHotEncoder. OneHotEncoderEstimator is just an alias. I'm not sure if we really need to add this test suite for it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as a...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/23231 [SPARK-26273][ML] Add OneHotEncoderEstimator as alias to OneHotEncoder ## What changes were proposed in this pull request? SPARK-26133 removed deprecated OneHotEncoder and renamed OneHotEncoderEstimator to OneHotEncoder. Based on ml migration doc, we need to keep OneHotEncoderEstimator as an alias to OneHotEncoder. This task is going to add it. ## How was this patch tested? Added tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 one-hot-encoder-estimator-alias Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23231.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23231 commit 17160710cadc49b54f4385ae3ca9ddb0eb4034b0 Author: Liang-Chi Hsieh Date: 2018-12-05T09:27:58Z Add OneHotEncoderEstimator as alias to OneHotEncoder. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23230: [SPARK-26133][ML][Followup] Fix doc for OneHotEncoder
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23230 cc @HyukjinKwon @srowen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23230: [SPARK-26133][ML][Followup] Fix doc for OneHotEnc...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/23230 [SPARK-26133][ML][Followup] Fix doc for OneHotEncoder ## What changes were proposed in this pull request? This fixes doc of renamed OneHotEncoder in PySpark. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 remove_one_hot_encoder_followup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23230.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23230 commit c84886aef9a53d0d58ca4f0f68ece57ee80f88c8 Author: Liang-Chi Hsieh Date: 2018-12-05T10:08:01Z Fix doc for OneHotEncoder. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r238909363 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -181,62 +180,39 @@ case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { private def isConvertible(relation: HiveTableRelation): Boolean = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) -serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || - serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) +isConvertible(relation.tableMeta) } - // Return true for Apache ORC and Hive ORC-related configuration names. - // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. - private def isOrcProperty(key: String) = -key.startsWith("orc.") || key.contains(".orc.") - - private def isParquetProperty(key: String) = -key.startsWith("parquet.") || key.contains(".parquet.") - - private def convert(relation: HiveTableRelation): LogicalRelation = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - -// Consider table and storage properties. For properties existing in both sides, storage -// properties will supersede table properties. -if (serde.contains("parquet")) { - val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ -relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) - sessionCatalog.metastoreCatalog -.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") -} else { - val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ -relation.tableMeta.storage.properties - if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], - "orc") - } else { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], - "orc") - } -} + private def isConvertible(tableMeta: CatalogTable): Boolean = { +val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) +serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || + serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC) } + private val metastoreCatalog = sessionCatalog.metastoreCatalog + override def apply(plan: LogicalPlan): LogicalPlan = { plan resolveOperators { // Write path case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists) // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && !r.isPartitioned && isConvertible(r) => -InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) +InsertIntoTable(metastoreCatalog.convert(r), partition, + query, overwrite, ifPartitionNotExists) // Read path case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => -convert(relation) +metastoreCatalog.convert(relation) + + // CTAS + case CreateTable(tableDesc, mode, Some(query)) + if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && +isConvertible(tableDesc) => --- End diff -- ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r238902415 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -181,62 +180,39 @@ case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { private def isConvertible(relation: HiveTableRelation): Boolean = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) -serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || - serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) +isConvertible(relation.tableMeta) } - // Return true for Apache ORC and Hive ORC-related configuration names. - // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. - private def isOrcProperty(key: String) = -key.startsWith("orc.") || key.contains(".orc.") - - private def isParquetProperty(key: String) = -key.startsWith("parquet.") || key.contains(".parquet.") - - private def convert(relation: HiveTableRelation): LogicalRelation = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - -// Consider table and storage properties. For properties existing in both sides, storage -// properties will supersede table properties. -if (serde.contains("parquet")) { - val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ -relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) - sessionCatalog.metastoreCatalog -.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") -} else { - val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ -relation.tableMeta.storage.properties - if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], - "orc") - } else { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], - "orc") - } -} + private def isConvertible(tableMeta: CatalogTable): Boolean = { +val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) +serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || + serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC) } + private val metastoreCatalog = sessionCatalog.metastoreCatalog + override def apply(plan: LogicalPlan): LogicalPlan = { plan resolveOperators { // Write path case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists) // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && !r.isPartitioned && isConvertible(r) => -InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) +InsertIntoTable(metastoreCatalog.convert(r), partition, + query, overwrite, ifPartitionNotExists) // Read path case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => -convert(relation) +metastoreCatalog.convert(relation) + + // CTAS + case CreateTable(tableDesc, mode, Some(query)) + if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && +isConvertible(tableDesc) => --- End diff -- hmm, the optimization is already controlled by configs like `HiveUtils.CONVERT_METASTORE_ORC` and `HiveUtils.CONVERT_METASTORE_PARQUET`. Do we need another config for it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r238707304 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,127 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` used to write data when the table exists. + def writingCommandForExistingTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + + // Returns `DataWritingCommand` used to write data when the table doesn't exist. + def writingCommandForNewTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +InsertIntoHiveTable( + tableDesc, + Map.empty, + query, + overwrite = false, + ifPartitionNotExists = false, + outputColumnNames = outputColumnNames) + } + + override def writingCommandForNewTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +// For CTAS, there is no static partition values to insert. +val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap +InsertIntoHiveTable( + tableDesc, + partition, + query, + overwrite = true, + ifPartitionNotExists = false, + outputColumnNames = outputColumnNames) + } +} + +/** + * Create table and insert the query result into it. This creates Hive table but inserts + * the query result into it by using data source. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class OptimizedCreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + private def getHadoopRelation( + catalog: SessionCatalog, + tableDesc: CatalogTable): HadoopFsRelation = { +val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog +val hiveTable = DDLUtils.readHiveTable(tableDesc) + +metastoreCatalog.convert(hiveTable) match { + case LogicalRelation(t: HadoopFsRelation, _, _, _) => t + case _ => throw new AnalysisException(s"$tableIdentifier should be converted to " + +"HadoopFsRelation.") +} + } + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +val hadoopRelation = getHadoopRelation(catalog, tableDesc) +InsertIntoHadoopFsRelationCommand( + hadoopRelation.location.rootPaths.head, + Map.empty, // We don't support to convert partitioned table. + false, + Seq.empty, // We don't support to convert partitioned table. + hadoopRelation.bucketSpec, + hadoopRelation.fileFormat, + hadoopRelation.options, + query, + mode, + Some(tableDesc), + Some(hadoopRelation.location), + query.output.map(_.name)) + } + + override def writingCommandForNewTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +val hadoopRelation = getHadoopRelation(catalog, tableDesc) +InsertIntoHadoopFsRelationCommand( + hadoopRelation.location.rootPaths.head, + Map.empty, // We don't support to convert partitioned table. + false, + Seq.empty, // We don't support to convert partitioned table. + hadoopRelation.bucketSpec, + hadoopRelation.fileFormat, + hadoopRelation.optio
[GitHub] spark pull request #23213: [SPARK-26262][SQL] Run SQLQueryTestSuite with WHO...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23213#discussion_r238696052 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala --- @@ -144,9 +144,10 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { val (comments, code) = input.split("\n").partition(_.startsWith("--")) // Runs all the tests on both codegen-only and interpreter modes -val codegenConfigSets = Array(CODEGEN_ONLY, NO_CODEGEN).map { - case codegenFactoryMode => -Array(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenFactoryMode.toString) +val codegenConfigSets = Array(("false", "NO_CODEGEN"), ("true", "CODEGEN_ONLY")).map { --- End diff -- will this increase too much test time? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23213: [SPARK-26262][SQL] Run SQLQueryTestSuite with WHO...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23213#discussion_r238695610 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2899,6 +2899,144 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } + + private def checkKeywordsExistsInExplain(df: DataFrame, keywords: String*): Unit = { +val output = new java.io.ByteArrayOutputStream() +Console.withOut(output) { + df.explain(extended = true) +} +val normalizedOutput = output.toString.replaceAll("#\\d+", "#x") +for (key <- keywords) { + assert(normalizedOutput.contains(key)) +} + } + + test("optimized plan should show the rewritten aggregate expression") { --- End diff -- Please update the PR description too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23214: [SPARK-26155] Optimizing the performance of LongToUnsafe...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23214 Thanks for doing this. I think we are more close to the root cause. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23204: Revert "[SPARK-21052][SQL] Add hash map metrics to join"
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23204 Is this observable in general hash join query, except for TPC-DS Q19? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23204: Revert "[SPARK-21052][SQL] Add hash map metrics t...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23204#discussion_r238270550 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -483,8 +470,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap */ def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = { if (isDense) { - numKeyLookups += 1 - numProbes += 1 --- End diff -- If as your test shows this is the cause of performance regression, we can just revert this and related changes. The change in `HashAggregateExec`, etc. can be kept. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23203: [SPARK-26252][PYTHON] Add support to run specific unitte...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23203 Not look closely at the changes yet, but I think it should be very useful. Thanks @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23184: [SPARK-26227][R] from_[csv|json] should accept sc...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23184#discussion_r238065068 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala --- @@ -225,4 +225,10 @@ private[sql] object SQLUtils extends Logging { } sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } + + def createArrayType(elementType: DataType): ArrayType = DataTypes.createArrayType(elementType) + + def createArrayType(elementType: Column): ArrayType = { +new ArrayType(ExprUtils.evalTypeExpr(elementType.expr), true) --- End diff -- oh, I see. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23184: [SPARK-26227][R] from_[csv|json] should accept sc...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23184#discussion_r237896514 --- Diff: R/pkg/R/functions.R --- @@ -202,8 +202,9 @@ NULL #' \itemize{ #' \item \code{from_json}: a structType object to use as the schema to use #' when parsing the JSON string. Since Spark 2.3, the DDL-formatted string is -#' also supported for the schema. -#' \item \code{from_csv}: a DDL-formatted string +#' also supported for the schema. Since Spark 3.0, \code{schema_of_json} or +#' a string literal can also be accepted. --- End diff -- What `a string literal` means here? Is it different to DDL-formated string? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23184: [SPARK-26227][R] from_[csv|json] should accept sc...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23184#discussion_r237898787 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala --- @@ -225,4 +225,10 @@ private[sql] object SQLUtils extends Logging { } sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } + + def createArrayType(elementType: DataType): ArrayType = DataTypes.createArrayType(elementType) + + def createArrayType(elementType: Column): ArrayType = { --- End diff -- `column` instead of `elementType`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23184: [SPARK-26227][R] from_[csv|json] should accept sc...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23184#discussion_r237899057 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala --- @@ -225,4 +225,10 @@ private[sql] object SQLUtils extends Logging { } sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } + + def createArrayType(elementType: DataType): ArrayType = DataTypes.createArrayType(elementType) + + def createArrayType(elementType: Column): ArrayType = { +new ArrayType(ExprUtils.evalTypeExpr(elementType.expr), true) --- End diff -- Instead of `true`, can we use `elementType.expr.nullable`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237818279 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,35 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. + * + * This method should be used (instead of `sameResult`) when comparing if 2 expressions are the + * same and one can replace the other (eg. in Optimizer/Analyzer rules where we want to replace + * equivalent expressions). It should not be used (and `sameResult` should be used instead) when + * comparing if 2 expressions produce the same results (in this case `semanticEquals` can be too + * strict). */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. --- End diff -- How about replace `output` with `output from plan perspective`? ``` Returns true when two expressions will always compute the same result, even if the output from plan perspective may be different, because of different names or similar differences. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/22514 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23152#discussion_r237768463 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala --- @@ -2276,4 +2276,16 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + + test("SPARK-26181 hasMinMaxStats method of ColumnStatsMap is not correct") { +withSQLConf(SQLConf.CBO_ENABLED.key -> "true") { + withTable("all_null") { +sql("create table all_null (attrInt int)") +sql("insert into all_null values (null)") +sql("analyze table all_null compute statistics for columns attrInt") +checkAnswer(sql("select * from all_null where attrInt < 1"), Nil) --- End diff -- This test can pass without this patch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r237749421 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala --- @@ -92,4 +92,18 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton } } } + + test("SPARK-25271: write empty map into hive parquet table") { --- End diff -- Added a new test for that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22957: [SPARK-25951][SQL] Ignore aliases for distributions and ...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/22957 Btw, I think we can update the PR title and description to reflect new changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237749287 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -780,6 +780,23 @@ class PlannerSuite extends SharedSQLContext { classOf[PartitioningCollection]) } } + + test("SPARK-25951: avoid redundant shuffle on rename") { --- End diff -- +1 if possible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22957: [SPARK-25951][SQL] Ignore aliases for distributions and ...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/22957 This looks good to me. Just a comment about wording. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237747550 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,35 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. + * + * This method should be used (instead of `sameResult`) when comparing if 2 expressions are the + * same and one can replace the other (eg. in Optimizer/Analyzer rules where we want to replace + * equivalent expressions). It should not be used (and `sameResult` should be used instead) when + * comparing if 2 expressions produce the same results (in this case `semanticEquals` can be too + * strict). */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. --- End diff -- I think here `output` is a bit confusing. Do we mean the output names? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237747770 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,35 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. + * + * This method should be used (instead of `sameResult`) when comparing if 2 expressions are the + * same and one can replace the other (eg. in Optimizer/Analyzer rules where we want to replace + * equivalent expressions). It should not be used (and `sameResult` should be used instead) when + * comparing if 2 expressions produce the same results (in this case `semanticEquals` can be too + * strict). */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. --- End diff -- So sameResult returns if the evaluated results between two expressions are exactly the same? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r237747152 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -181,62 +180,39 @@ case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { private def isConvertible(relation: HiveTableRelation): Boolean = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) -serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || - serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) +isConvertible(relation.tableMeta) } - // Return true for Apache ORC and Hive ORC-related configuration names. - // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. - private def isOrcProperty(key: String) = -key.startsWith("orc.") || key.contains(".orc.") - - private def isParquetProperty(key: String) = -key.startsWith("parquet.") || key.contains(".parquet.") - - private def convert(relation: HiveTableRelation): LogicalRelation = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - -// Consider table and storage properties. For properties existing in both sides, storage -// properties will supersede table properties. -if (serde.contains("parquet")) { - val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ -relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) - sessionCatalog.metastoreCatalog -.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") -} else { - val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ -relation.tableMeta.storage.properties - if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], - "orc") - } else { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], - "orc") - } -} + private def isConvertible(tableMeta: CatalogTable): Boolean = { +val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) +serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || + serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC) } + private val metastoreCatalog = sessionCatalog.metastoreCatalog + override def apply(plan: LogicalPlan): LogicalPlan = { plan resolveOperators { // Write path case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists) // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && !r.isPartitioned && isConvertible(r) => -InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) +InsertIntoTable(metastoreCatalog.convert(r), partition, + query, overwrite, ifPartitionNotExists) // Read path case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => -convert(relation) +metastoreCatalog.convert(relation) + + // CTAS + case CreateTable(tableDesc, mode, Some(query)) + if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && +isConvertible(tableDesc) => +DDLUtils.checkDataColNames(tableDesc) --- End diff -- In HiveAnalysis, when transforming CreateTable to CreateHiveTableAsSelectCommand, it has this too. checkDataColNames checks if any invalid character is using in column name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/22514 https://user-images.githubusercontent.com/68855/49268483-aaa6d000-f49a-11e8-92c3-5ee78012fe9e.png";> --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23152#discussion_r237721273 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -879,13 +879,13 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) { } def hasCountStats(a: Attribute): Boolean = -get(a).map(_.hasCountStats).getOrElse(false) +get(a).exists(_.hasCountStats) def hasDistinctCount(a: Attribute): Boolean = -get(a).map(_.distinctCount.isDefined).getOrElse(false) +get(a).exists(_.distinctCount.isDefined) def hasMinMaxStats(a: Attribute): Boolean = -get(a).map(_.hasCountStats).getOrElse(false) +get(a).exists(_.hasMinMaxStats) --- End diff -- @liancheng Thanks. @adrian-wang Can you also add a test based on that? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23152#discussion_r237720737 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -879,13 +879,13 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) { } def hasCountStats(a: Attribute): Boolean = -get(a).map(_.hasCountStats).getOrElse(false) +get(a).exists(_.hasCountStats) def hasDistinctCount(a: Attribute): Boolean = -get(a).map(_.distinctCount.isDefined).getOrElse(false) +get(a).exists(_.distinctCount.isDefined) def hasMinMaxStats(a: Attribute): Boolean = -get(a).map(_.hasCountStats).getOrElse(false) +get(a).exists(_.hasMinMaxStats) --- End diff -- So this is still an actual bug. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/23152 Agreed. But looks like the added test was failed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts,...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23171#discussion_r237402055 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -335,6 +343,41 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { """.stripMargin) } + private def genCodeWithSwitch(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val (nullLiterals, nonNullLiterals) = list.partition { + case Literal(null, _) => true + case _ => false +} +val listGen = nonNullLiterals.map(_.genCode(ctx)) +val valueGen = value.genCode(ctx) + +val caseBranches = listGen.map(literal => --- End diff -- style: ```scala listGen.map { literal => ... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts,...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23171#discussion_r237405465 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -335,6 +343,41 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { """.stripMargin) } + private def genCodeWithSwitch(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val (nullLiterals, nonNullLiterals) = list.partition { + case Literal(null, _) => true + case _ => false +} +val listGen = nonNullLiterals.map(_.genCode(ctx)) +val valueGen = value.genCode(ctx) + +val caseBranches = listGen.map(literal => + s""" + |case ${literal.value}: + | ${ev.value} = true; + | break; + """.stripMargin) + +ev.copy(code = + code""" + |${valueGen.code} + |${CodeGenerator.JAVA_BOOLEAN} ${ev.isNull} = ${valueGen.isNull}; + |${CodeGenerator.JAVA_BOOLEAN} ${ev.value} = false; + |if (!${valueGen.isNull}) { + | switch (${valueGen.value}) { + |${caseBranches.mkString("")} + |default: + | ${ev.isNull} = ${nullLiterals.nonEmpty}; + | } + |} + """.stripMargin) + } + + private def isSwitchCompatible: Boolean = list.forall { +case Literal(_, dt) => dt == ByteType || dt == ShortType || dt == IntegerType --- End diff -- Can be simplified to? ```scala private def isSwitchCompatible: Boolean = { inSetConvertible && (value.dataType == ByteType || value.dataType == ShortType || value.dataType == IntegerType) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23176#discussion_r237398085 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -367,11 +367,29 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with } @transient lazy val set: Set[Any] = child.dataType match { -case _: AtomicType => hset +case t: AtomicType if !t.isInstanceOf[BinaryType] => hset case _: NullType => hset case _ => + val ord = TypeUtils.getInterpretedOrdering(child.dataType) + val ordering = if (hasNull) { +new Ordering[Any] { + override def compare(x: Any, y: Any): Int = { --- End diff -- Or simply filter out null from the tree set as @cloud-fan's idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23176#discussion_r237397442 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -367,11 +367,29 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with } @transient lazy val set: Set[Any] = child.dataType match { -case _: AtomicType => hset +case t: AtomicType if !t.isInstanceOf[BinaryType] => hset case _: NullType => hset case _ => + val ord = TypeUtils.getInterpretedOrdering(child.dataType) + val ordering = if (hasNull) { +new Ordering[Any] { + override def compare(x: Any, y: Any): Int = { --- End diff -- InSet overrides nullSafeEval, and for codegen we look into `set` only if `!ev.isNull`, so I think we only need to consider the case one side is null. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23152#discussion_r237382531 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -879,13 +879,13 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) { } def hasCountStats(a: Attribute): Boolean = -get(a).map(_.hasCountStats).getOrElse(false) +get(a).exists(_.hasCountStats) def hasDistinctCount(a: Attribute): Boolean = -get(a).map(_.distinctCount.isDefined).getOrElse(false) +get(a).exists(_.distinctCount.isDefined) def hasMinMaxStats(a: Attribute): Boolean = -get(a).map(_.hasCountStats).getOrElse(false) +get(a).exists(_.hasMinMaxStats) --- End diff -- Based on https://github.com/apache/spark/pull/23152#discussion_r237359357, the old `hasMinMaxStats` actually work correctly although it looks not at a glance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23152#discussion_r237381966 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -821,6 +822,32 @@ class FilterEstimationSuite extends StatsEstimationTestBase { expectedRowCount = 3) } + test("ColumnStatsMap tests") { --- End diff -- Oh I see. So `FilterEstimation` works well without this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/22514 > can we try a query and see what the SQL UI looks like? Yes. I will try a query and post the SQL UI. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r237364946 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala --- @@ -92,4 +92,18 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton } } } + + test("SPARK-25271: write empty map into hive parquet table") { --- End diff -- I agreed. Now because we have two Hive CTAS commands, it is easier to test it. Will add tests later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23152#discussion_r237359890 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -821,6 +822,32 @@ class FilterEstimationSuite extends StatsEstimationTestBase { expectedRowCount = 3) } + test("ColumnStatsMap tests") { --- End diff -- If `hasMinMaxStats` is not correct previously, will `evaluateBinary` work correctly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23152#discussion_r237347496 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -821,6 +822,32 @@ class FilterEstimationSuite extends StatsEstimationTestBase { expectedRowCount = 3) } + test("ColumnStatsMap tests") { --- End diff -- Since this is not correct, I think that FilterEstimation didn't work previously. Can you also add a related test using validateEstimatedStats similar to other tests? In the test, we should rely on min and max stats to do filter estimation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/22514 Yea, lets see if retest works well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org