[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...

2018-12-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23272#discussion_r240204508
  
--- Diff: 
core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
 ---
@@ -667,4 +669,54 @@ public void testPeakMemoryUsed() {
 }
   }
 
+  @Test
+  public void avoidDeadlock() throws InterruptedException {
+memoryManager.limit(PAGE_SIZE_BYTES);
+MemoryMode mode = useOffHeapMemoryAllocator() ? MemoryMode.OFF_HEAP: 
MemoryMode.ON_HEAP;
+TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager, 
mode);
+BytesToBytesMap map =
+  new BytesToBytesMap(taskMemoryManager, blockManager, 
serializerManager, 1, 0.5, 1024);
+
+Runnable memoryConsumer = new Runnable() {
+  @Override
+  public void run() {
+int i = 0;
+long used = 0;
+while (i < 10) {
+  c1.use(1000);
+  used += 1000;
+  i++;
+}
+c1.free(used);
+  }
+};
+
+Thread thread = new Thread(memoryConsumer);
+
+try {
+  int i;
+  for (i = 0; i < 1024; i++) {
+final long[] arr = new long[]{i};
+final BytesToBytesMap.Location loc = map.lookup(arr, 
Platform.LONG_ARRAY_OFFSET, 8);
+loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, 
Platform.LONG_ARRAY_OFFSET, 8);
+  }
+
+  // Starts to require memory at another memory consumer.
+  thread.start();
+
+  BytesToBytesMap.MapIterator iter = map.destructiveIterator();
+  for (i = 0; i < 1024; i++) {
+iter.next();
+  }
+  assertFalse(iter.hasNext());
+} finally {
+  map.free();
+  thread.join();
--- End diff --

This line just makes sure `memoryConsumer` to end and free memory.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...

2018-12-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23272#discussion_r240204245
  
--- Diff: 
core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
 ---
@@ -667,4 +669,54 @@ public void testPeakMemoryUsed() {
 }
   }
 
+  @Test
+  public void avoidDeadlock() throws InterruptedException {
+memoryManager.limit(PAGE_SIZE_BYTES);
+MemoryMode mode = useOffHeapMemoryAllocator() ? MemoryMode.OFF_HEAP: 
MemoryMode.ON_HEAP;
+TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager, 
mode);
+BytesToBytesMap map =
+  new BytesToBytesMap(taskMemoryManager, blockManager, 
serializerManager, 1, 0.5, 1024);
+
+Runnable memoryConsumer = new Runnable() {
+  @Override
+  public void run() {
+int i = 0;
+long used = 0;
+while (i < 10) {
+  c1.use(1000);
+  used += 1000;
+  i++;
+}
+c1.free(used);
+  }
+};
+
+Thread thread = new Thread(memoryConsumer);
+
+try {
+  int i;
+  for (i = 0; i < 1024; i++) {
+final long[] arr = new long[]{i};
+final BytesToBytesMap.Location loc = map.lookup(arr, 
Platform.LONG_ARRAY_OFFSET, 8);
+loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, 
Platform.LONG_ARRAY_OFFSET, 8);
+  }
+
+  // Starts to require memory at another memory consumer.
+  thread.start();
+
+  BytesToBytesMap.MapIterator iter = map.destructiveIterator();
+  for (i = 0; i < 1024; i++) {
+iter.next();
+  }
+  assertFalse(iter.hasNext());
+} finally {
+  map.free();
+  thread.join();
--- End diff --

When without this line, the test still hangs. The test thread hangs on the 
deadlock with the other thread of running `memoryConsumer`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...

2018-12-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23272#discussion_r240187707
  
--- Diff: 
core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java ---
@@ -38,12 +38,14 @@ public long spill(long size, MemoryConsumer trigger) 
throws IOException {
 return used;
   }
 
-  void use(long size) {
+  @VisibleForTesting
--- End diff --

Ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...

2018-12-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23272#discussion_r240187678
  
--- Diff: 
core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
 ---
@@ -667,4 +668,53 @@ public void testPeakMemoryUsed() {
 }
   }
 
+  @Test
+  public void avoidDeadlock() throws InterruptedException {
--- End diff --

I've tried few ways to set a timeout logic, but don't work. The deadlock 
hangs the test thread.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesMap.MapI...

2018-12-10 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23272
  
> have you seen any bug report caused by this dead lock?

The original reporter of the JIRA ticket SPARK-26265 has hit with this bug 
in their workload.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...

2018-12-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23272#discussion_r240170574
  
--- Diff: 
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---
@@ -255,11 +255,18 @@ private MapIterator(int numRecords, Location loc, 
boolean destructive) {
 }
 
 private void advanceToNextPage() {
+  // SPARK-26265: We will first lock this `MapIterator` and then 
`TaskMemoryManager` when going
+  // to free a memory page by calling `freePage`. At the same time, it 
is possibly that another
+  // memory consumer first locks `TaskMemoryManager` and then this 
`MapIterator` when it
+  // acquires memory and causes spilling on this `MapIterator`. To 
avoid deadlock here, we keep
+  // reference to the page to free and free it after releasing the 
lock of `MapIterator`.
--- End diff --

Yea, OffHead also suffers from this issue. One change is needed for this 
test case to cover it. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23269: [SPARK-26316] Currently the wrong implementation in the ...

2018-12-10 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23269
  
hmm, I think the PR title is too long...Maybe just `Revert hash join 
metrics that causes performance degradation`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesMap.MapI...

2018-12-10 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23272
  
cc @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...

2018-12-10 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/23272

[SPARK-26265][Core] Fix deadlock in BytesToBytesMap.MapIterator when 
locking both BytesToBytesMap.MapIterator and TaskMemoryManager

## What changes were proposed in this pull request?

In `BytesToBytesMap.MapIterator.advanceToNextPage`, We will first lock this 
`MapIterator` and then `TaskMemoryManager` when going to free a memory page by 
calling `freePage`. At the same time, it is possibly that another memory 
consumer first locks `TaskMemoryManager` and then this `MapIterator` when it 
acquires memory and causes spilling on this `MapIterator`.

So it ends with the `MapIterator` object holds lock to the `MapIterator` 
object and waits for lock on `TaskMemoryManager`, and the other consumer holds 
lock to `TaskMemoryManager` and waits for lock on the `MapIterator` object.

To avoid deadlock here, this patch proposes to keep reference to the page 
to free and free it after releasing the lock of `MapIterator`.

## How was this patch tested?

Added test and manually test by running the test 100 times to make sure 
there is no deadlock.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 SPARK-26265

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23272.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23272


commit 25e8e068047b714f27706399e1e6c03c338ac178
Author: Liang-Chi Hsieh 
Date:   2018-12-10T07:59:09Z

Fix deadlock in BytesToBytesMap.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-09 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r240041688
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] 
with PredicateHelper {
 expressions.flatMap(collectEvaluableUDFs)
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-case plan: LogicalPlan => extract(plan)
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+// SPARK-26293: A subquery will be rewritten into join later, and will 
go through this rule
+// eventually. Here we skip subquery, as Python UDF only needs to be 
extracted once.
+case _: Subquery => plan
--- End diff --

I'm not sure if it is totally ok to skip `Subquery` for all optimizer rules.

For `ExtractPythonUDFs` I think it is ok because `ExtractPythonUDFs` is 
performed after the rules in `RewriteSubquery`. So we can skip 
`ExtractPythonUDFs` here and extract Python UDF after the subqueries are 
rewritten into join.

But for the rules which perform before `RewriteSubquery`, if we skip it on 
`Subquery`, we have no chance to do the rules after the subqueries are 
rewritten into join.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23263: [SPARK-23674][ML] Adds Spark ML Events

2018-12-08 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23263#discussion_r240003952
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Estimator.scala ---
@@ -65,7 +65,19 @@ abstract class Estimator[M <: Model[M]] extends 
PipelineStage {
* Fits a model to the input data.
*/
   @Since("2.0.0")
-  def fit(dataset: Dataset[_]): M
+  def fit(dataset: Dataset[_]): M = MLEvents.withFitEvent(this, dataset) {
+fitImpl(dataset)
+  }
+
+  /**
+   * `fit()` handles events and then calls this method. Subclasses should 
override this
+   * method to implement the actual fiting a model to the input data.
+   */
+  @Since("3.0.0")
+  protected def fitImpl(dataset: Dataset[_]): M = {
+// Keep this default body for backward compatibility.
+throw new UnsupportedOperationException("fitImpl is not implemented.")
--- End diff --

For current change, Spark ML developers can still choose to override `fit` 
instead `fitImpl` so their ML model can work without ML event?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23263: [SPARK-23674][ML] Adds Spark ML Events

2018-12-08 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23263#discussion_r240003674
  
--- Diff: mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+
+import org.apache.hadoop.fs.Path
+import org.mockito.Matchers.{any, eq => meq}
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
+import org.scalatest.mockito.MockitoSugar.mock
+
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.ml.param.ParamMap
+import org.apache.spark.ml.util._
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.sql._
+import org.apache.spark.util.Utils
+
+
+class MLEventsSuite
+extends SparkFunSuite
+with BeforeAndAfterEach
+with DefaultReadWriteTest
+with Eventually {
+
+  private var spark: SparkSession = _
+  private var sc: SparkContext = _
+  private var checkpointDir: String = _
+  private var listener: SparkListener = _
+  private val dirName: String = "pipeline"
+  private val events = mutable.ArrayBuffer.empty[MLEvent]
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+sc = new SparkContext("local[2]", "SparkListenerSuite")
+listener = new SparkListener {
+  override def onOtherEvent(event: SparkListenerEvent): Unit = event 
match {
+case e: FitStart[_] => events.append(e)
+case e: FitEnd[_] => events.append(e)
+case e: TransformStart => events.append(e)
+case e: TransformEnd => events.append(e)
+case e: SaveInstanceStart if e.path.endsWith(dirName) => 
events.append(e)
+case e: SaveInstanceEnd if e.path.endsWith(dirName) => 
events.append(e)
+case _ =>
+  }
+}
+sc.addSparkListener(listener)
+
+spark = SparkSession.builder()
+  .sparkContext(sc)
+  .getOrCreate()
+
+checkpointDir = Utils.createDirectory(tempDir.getCanonicalPath, 
"checkpoints").toString
+sc.setCheckpointDir(checkpointDir)
--- End diff --

I may miss it, where do we use checkpoint?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23263: [SPARK-23674][ML] Adds Spark ML Events

2018-12-08 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23263#discussion_r240003563
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---
@@ -132,7 +132,8 @@ class Pipeline @Since("1.4.0") (
* @return fitted pipeline
*/
   @Since("2.0.0")
-  override def fit(dataset: Dataset[_]): PipelineModel = {
+  override def fit(dataset: Dataset[_]): PipelineModel = super.fit(dataset)
--- End diff --

Is there any `fit` method which doesn't do `super.fit()`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...

2018-12-08 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23253#discussion_r240003434
  
--- Diff: docs/sql-migration-guide-upgrade.md ---
@@ -37,6 +37,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, CSV datasource converts a malformed 
CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, 
returned row can contain non-`null` fields if some of CSV column values were 
parsed and converted to desired types successfully.
 
+  - In Spark version 2.4 and earlier, JSON datasource and JSON functions 
like `from_json` convert a bad JSON record to a row with all `null`s in the 
PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, 
returned row can contain non-`null` fields if some of JSON column values were 
parsed and converted to desired types successfully.
+
--- End diff --

Ok. Sounds reasonable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...

2018-12-08 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23253#discussion_r239998453
  
--- Diff: docs/sql-migration-guide-upgrade.md ---
@@ -37,6 +37,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, CSV datasource converts a malformed 
CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, 
returned row can contain non-`null` fields if some of CSV column values were 
parsed and converted to desired types successfully.
 
+  - In Spark version 2.4 and earlier, JSON datasource and JSON functions 
like `from_json` convert a bad JSON record to a row with all `null`s in the 
PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, 
returned row can contain non-`null` fields if some of JSON column values were 
parsed and converted to desired types successfully.
+
--- End diff --

For such behavior change, shall we add a config to roll back to previous 
behavior?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...

2018-12-08 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23253#discussion_r239998485
  
--- Diff: docs/sql-migration-guide-upgrade.md ---
@@ -37,6 +37,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, CSV datasource converts a malformed 
CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, 
returned row can contain non-`null` fields if some of CSV column values were 
parsed and converted to desired types successfully.
 
+  - In Spark version 2.4 and earlier, JSON datasource and JSON functions 
like `from_json` convert a bad JSON record to a row with all `null`s in the 
PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, 
returned row can contain non-`null` fields if some of JSON column values were 
parsed and converted to desired types successfully.
+
--- End diff --

And you should also update other places where defines previous behavior, 
like DataFrameReader.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...

2018-12-08 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/20146
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23259: [SPARK-26215][SQL][WIP] Define reserved/non-reser...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23259#discussion_r239994385
  
--- Diff: 
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---
@@ -769,7 +774,7 @@ nonReserved
 | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | RECOVER | EXPORT | 
IMPORT | LOAD | VALUES | COMMENT | ROLE
 | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | 
LOCKS | OPTION | LOCAL | INPATH
 | ASC | DESC | LIMIT | RENAME | SETS
-| AT | NULLS | OVERWRITE | ALL | ANY | ALTER | AS | BETWEEN | BY | 
CREATE | DELETE
+| AT | NULLS | OVERWRITE | ANY | ALTER | AS | BETWEEN | BY | CREATE | 
DELETE
--- End diff --

Doesn't `ANY` move to `reserved`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23253#discussion_r239994326
  
--- Diff: docs/sql-migration-guide-upgrade.md ---
@@ -37,6 +37,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, CSV datasource converts a malformed 
CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, 
returned row can contain non-`null` fields if some of CSV column values were 
parsed and converted to desired types successfully.
 
+  - In Spark version 2.4 and earlier, JSON datasource and JSON functions 
like `from_json` convert a bad JSON record to a row with all `null`s in the 
PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, 
returned row can contain non-`null` fields if some of JSON column values were 
parsed and converted to desired types successfully.
+
--- End diff --

If there is no corrupt column?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...

2018-12-07 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/20146
  
Thanks @holdenk for reviewing! I've resolved some comments and replied 
others. Please take a look. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239994064
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def countByValue(
+  dataset: Dataset[_],
+  inputCols: Array[String]): Array[OpenHashMap[String, Long]] = {
+
+val aggregator = new StringIndexerAggregator(inputCols.length)
+implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]]
+
+dataset.select(inputCols.map(col(_).cast(StringType)): _*)
+  .toDF
+  .groupBy().agg(aggregator.toColumn)
+  .as[Array[OpenHashMap[String, Long]]]
+  .collect()(0)
+  }
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+
+val filteredDF = dataset.na.drop(inputCols)
+
+// In case of equal frequency when frequencyDesc/Asc, we further sort 
the strings by alphabet.
+val labelsArray = $(stringOrderType) match {
+  case StringIndexer.frequencyDesc =>
+countByValue(filteredDF, inputCols).map { counts =>
+  counts.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray
--- End diff --

As one sorts by string and another one sorts by count, can we replace them 
with a compound expression with a single sortBy?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23258: [SPARK-23375][SQL][FOLLOWUP][TEST] Test Sort metr...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23258#discussion_r239993927
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -182,10 +182,13 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   }
 
   test("Sort metrics") {
-// Assume the execution plan is
-// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1))
-val ds = spark.range(10).sort('id)
-testSparkPlanMetrics(ds.toDF(), 2, Map.empty)
+// Assume the execution plan with node id is
+// Sort(nodeId = 0)
+//   Exchange(nodeId = 1)
+// Range(nodeId = 2)
+val df = spark.range(9, -1, -1).sort('id).toDF()
--- End diff --

Either is fine to me as we now add assert to make sure Sort node exist.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239993480
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def countByValue(
+  dataset: Dataset[_],
+  inputCols: Array[String]): Array[OpenHashMap[String, Long]] = {
+
+val aggregator = new StringIndexerAggregator(inputCols.length)
+implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]]
+
+dataset.select(inputCols.map(col(_).cast(StringType)): _*)
+  .toDF
+  .groupBy().agg(aggregator.toColumn)
+  .as[Array[OpenHashMap[String, Long]]]
+  .collect()(0)
+  }
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+
+val filteredDF = dataset.na.drop(inputCols)
+
+// In case of equal frequency when frequencyDesc/Asc, we further sort 
the strings by alphabet.
--- End diff --

I'll also add it to ml migration document.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239992942
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -310,11 +439,23 @@ object StringIndexerModel extends 
MLReadable[StringIndexerModel] {
 override def load(path: String): StringIndexerModel = {
   val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
   val dataPath = new Path(path, "data").toString
-  val data = sparkSession.read.parquet(dataPath)
-.select("labels")
-.head()
-  val labels = data.getAs[Seq[String]](0).toArray
-  val model = new StringIndexerModel(metadata.uid, labels)
+
+  val (majorVersion, minorVersion) = 
majorMinorVersion(metadata.sparkVersion)
+  val labelsArray = if (majorVersion < 2 || (majorVersion == 2 && 
minorVersion <= 3)) {
--- End diff --

But this needs to change for Spark 3.0 now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239992845
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -310,11 +439,23 @@ object StringIndexerModel extends 
MLReadable[StringIndexerModel] {
 override def load(path: String): StringIndexerModel = {
   val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
   val dataPath = new Path(path, "data").toString
-  val data = sparkSession.read.parquet(dataPath)
-.select("labels")
-.head()
-  val labels = data.getAs[Seq[String]](0).toArray
-  val model = new StringIndexerModel(metadata.uid, labels)
+
+  val (majorVersion, minorVersion) = 
majorMinorVersion(metadata.sparkVersion)
+  val labelsArray = if (majorVersion < 2 || (majorVersion == 2 && 
minorVersion <= 3)) {
--- End diff --

This is for loading old StringIndexerModel saved by previous Spark. 
Previous model has `labels`, but new model has `labelsArray`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239992579
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def countByValue(
+  dataset: Dataset[_],
+  inputCols: Array[String]): Array[OpenHashMap[String, Long]] = {
+
+val aggregator = new StringIndexerAggregator(inputCols.length)
+implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]]
+
+dataset.select(inputCols.map(col(_).cast(StringType)): _*)
+  .toDF
+  .groupBy().agg(aggregator.toColumn)
+  .as[Array[OpenHashMap[String, Long]]]
+  .collect()(0)
+  }
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+
+val filteredDF = dataset.na.drop(inputCols)
+
+// In case of equal frequency when frequencyDesc/Asc, we further sort 
the strings by alphabet.
+val labelsArray = $(stringOrderType) match {
+  case StringIndexer.frequencyDesc =>
+countByValue(filteredDF, inputCols).map { counts =>
+  counts.toSeq.sortBy(_._1).sortBy(-_._2).map(_._1).toArray
+}
+  case StringIndexer.frequencyAsc =>
+countByValue(filteredDF, inputCols).map { counts =>
+  counts.toSeq.sortBy(_._1).sortBy(_._2).map(_._1).toArray
+}
+  case StringIndexer.alphabetDesc =>
+import dataset.sparkSession.implicits._
+inputCols.map { inputCol =>
+  
filteredDF.select(inputCol).distinct().sort(dataset(s"$inputCol").desc)
--- End diff --

That's good. I miss this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239992360
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -79,26 +81,53 @@ private[feature] trait StringIndexerBase extends Params 
with HasHandleInvalid wi
   @Since("2.3.0")
   def getStringOrderType: String = $(stringOrderType)
 
-  /** Validates and transforms the input schema. */
-  protected def validateAndTransformSchema(schema: StructType): StructType 
= {
-val inputColName = $(inputCol)
+  /** Returns the input and output column names corresponding in pair. */
+  private[feature] def getInOutCols(): (Array[String], Array[String]) = {
+ParamValidators.checkSingleVsMultiColumnParams(this, Seq(outputCol), 
Seq(outputCols))
+
+if (isSet(inputCol)) {
+  (Array($(inputCol)), Array($(outputCol)))
+} else {
+  require($(inputCols).length == $(outputCols).length,
+"The number of input columns does not match output columns")
+  ($(inputCols), $(outputCols))
+}
+  }
+
+  private def validateAndTransformField(
+  schema: StructType,
+  inputColName: String,
+  outputColName: String): StructField = {
 val inputDataType = schema(inputColName).dataType
 require(inputDataType == StringType || 
inputDataType.isInstanceOf[NumericType],
   s"The input column $inputColName must be either string type or 
numeric type, " +
 s"but got $inputDataType.")
-val inputFields = schema.fields
-val outputColName = $(outputCol)
-require(inputFields.forall(_.name != outputColName),
+require(schema.fields.forall(_.name != outputColName),
--- End diff --

Sounds good to me. Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239992378
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private def countByValue(
+  dataset: Dataset[_],
+  inputCols: Array[String]): Array[OpenHashMap[String, Long]] = {
+
+val aggregator = new StringIndexerAggregator(inputCols.length)
+implicit val encoder = Encoders.kryo[Array[OpenHashMap[String, Long]]]
+
+dataset.select(inputCols.map(col(_).cast(StringType)): _*)
+  .toDF
+  .groupBy().agg(aggregator.toColumn)
+  .as[Array[OpenHashMap[String, Long]]]
+  .collect()(0)
+  }
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): StringIndexerModel = {
 transformSchema(dataset.schema, logging = true)
-val values = dataset.na.drop(Array($(inputCol)))
-  .select(col($(inputCol)).cast(StringType))
-  .rdd.map(_.getString(0))
-val labels = $(stringOrderType) match {
-  case StringIndexer.frequencyDesc => 
values.countByValue().toSeq.sortBy(-_._2)
-.map(_._1).toArray
-  case StringIndexer.frequencyAsc => 
values.countByValue().toSeq.sortBy(_._2)
-.map(_._1).toArray
-  case StringIndexer.alphabetDesc => 
values.distinct.collect.sortWith(_ > _)
-  case StringIndexer.alphabetAsc => values.distinct.collect.sortWith(_ 
< _)
-}
-copyValues(new StringIndexerModel(uid, labels).setParent(this))
+
+val (inputCols, _) = getInOutCols()
+
+val filteredDF = dataset.na.drop(inputCols)
+
+// In case of equal frequency when frequencyDesc/Asc, we further sort 
the strings by alphabet.
--- End diff --

Moved to `stringOrderType`'s doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20146: [SPARK-11215][ML] Add multiple columns support to...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20146#discussion_r239991238
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -130,21 +159,60 @@ class StringIndexer @Since("1.4.0") (
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
--- End diff --

Yes. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239990986
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -38,13 +38,21 @@ case class CollectLimitExec(limit: Int, child: 
SparkPlan) extends UnaryExecNode
   override def outputPartitioning: Partitioning = SinglePartition
   override def executeCollect(): Array[InternalRow] = 
child.executeTake(limit)
   private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
-  override lazy val metrics = 
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+  private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+  private lazy val readMetrics =
+SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
--- End diff --

I feel it is better to rename SQLShuffleMetricsReporter to 
SQLShuffleReadMetricsReporter to make it match with 
SQLShuffleWriteMetricsReporter. It can be in a followup.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23253#discussion_r239846601
  
--- Diff: docs/sql-migration-guide-upgrade.md ---
@@ -37,6 +37,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, CSV datasource converts a malformed 
CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, 
returned row can contain non-`null` fields if some of CSV column values were 
parsed and converted to desired types successfully.
 
+  - In Spark version 2.4 and earlier, JSON datasource and JSON functions 
like `from_json` convert a bad JSON record to a row with all `null`s in the 
PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, 
returned row can contain non-`null` fields if some of JSON column values were 
parsed and converted to desired types successfully.
+
--- End diff --

And this behavior is also defined at some places like DataFrameReader.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23253#discussion_r239846300
  
--- Diff: docs/sql-migration-guide-upgrade.md ---
@@ -37,6 +37,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, CSV datasource converts a malformed 
CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, 
returned row can contain non-`null` fields if some of CSV column values were 
parsed and converted to desired types successfully.
 
+  - In Spark version 2.4 and earlier, JSON datasource and JSON functions 
like `from_json` convert a bad JSON record to a row with all `null`s in the 
PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, 
returned row can contain non-`null` fields if some of JSON column values were 
parsed and converted to desired types successfully.
+
--- End diff --

hmm, if returned row contains non null fields, how do we know if the row is 
read from a bad JSON record or a correct JSON record?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...

2018-12-07 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/20146
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23224: [SPARK-26277][SQL][TEST] WholeStageCodegen metric...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23224#discussion_r239837818
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -80,8 +80,10 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
 // Assume the execution plan is
 // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 
1))
 // TODO: update metrics in generated operators
-val ds = spark.range(10).filter('id < 5)
-testSparkPlanMetrics(ds.toDF(), 1, Map.empty)
+val df = spark.range(10).filter('id < 5).toDF()
+testSparkPlanMetrics(df, 1, Map.empty, true)
+
df.queryExecution.executedPlan.find(_.isInstanceOf[WholeStageCodegenExec])
+  .getOrElse(assert(false))
--- End diff --

Seems test `Sort metric` also has similar issue:

```scala
test("Sort metrics") {
  // Assume the execution plan is
  // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1))
  val ds = spark.range(10).sort('id)
  testSparkPlanMetrics(ds.toDF(), 2, Map.empty)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r239754619
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -118,10 +115,12 @@ case class HashClusteredDistribution(
 
 /**
  * Represents data where tuples have been ordered according to the 
`ordering`
- * [[Expression Expressions]].  This is a strictly stronger guarantee than
- * [[ClusteredDistribution]] as an ordering will ensure that tuples that 
share the
- * same value for the ordering expressions are contiguous and will never 
be split across
- * partitions.
+ * [[Expression Expressions]]. Its requirement is defined as the following:
+ *   - Given any 2 adjacent partitions, all the rows of the second 
partition must be larger than or
+ * equal to any row in the first partition, according to the 
`ordering` expressions.
--- End diff --

Why here we need this equality? Can we just have all the rows in the second 
partition must be larger than any row in the first partition?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-12-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r239744338
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand(
 Seq.empty[Row]
   }
 
+  // Returns `DataWritingCommand` used to write data when the table exists.
+  def writingCommandForExistingTable(
+catalog: SessionCatalog,
+tableDesc: CatalogTable): DataWritingCommand
+
+  // Returns `DataWritingCommand` used to write data when the table 
doesn't exist.
+  def writingCommandForNewTable(
+catalog: SessionCatalog,
+tableDesc: CatalogTable): DataWritingCommand
+
   override def argString: String = {
 s"[Database:${tableDesc.database}, " +
 s"TableName: ${tableDesc.identifier.table}, " +
 s"InsertIntoHiveTable]"
   }
 }
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableDesc the table description, which may contain serde, 
storage handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class CreateHiveTableAsSelectCommand(
+tableDesc: CatalogTable,
+query: LogicalPlan,
+outputColumnNames: Seq[String],
+mode: SaveMode)
+  extends CreateHiveTableAsSelectBase {
+
+  override def writingCommandForExistingTable(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable): DataWritingCommand = {
+InsertIntoHiveTable(
+  tableDesc,
+  Map.empty,
--- End diff --

That is different issue. I think it is better to have a separate PR to 
address it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...

2018-12-06 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/20146
  
ping @dbtsai 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23239: [SPARK-26021][SQL][followup] only deal with NaN and -0.0...

2018-12-06 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23239
  
The migration guide has changed by another followup 
https://github.com/apache/spark/pull/23141:

> In Spark version 2.4 and earlier, float/double -0.0 is semantically equal 
to 0.0, but users can still distinguish them via `Dataset.show`, 
`Dataset.collect` etc. Since Spark 3.0, float/double -0.0 is replaced by 0.0 
internally, and users can't distinguish them any more.

Is above still correct after this change?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23239: [SPARK-26021][SQL][followup] only deal with NaN a...

2018-12-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23239#discussion_r239690045
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java
 ---
@@ -198,11 +198,45 @@ protected final void writeLong(long offset, long 
value) {
 Platform.putLong(getBuffer(), offset, value);
   }
 
+  // We need to take care of NaN and -0.0 in several places:
+  //   1. When compare values, different NaNs should be treated as same, 
`-0.0` and `0.0` should be
+  //  treated as same.
+  //   2. In range partitioner, different NaNs should belong to the same 
partition, -0.0 and 0.0
--- End diff --

As this is not a problem, we should update the PR description too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-12-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r239668492
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand(
 Seq.empty[Row]
   }
 
+  // Returns `DataWritingCommand` used to write data when the table exists.
+  def writingCommandForExistingTable(
+catalog: SessionCatalog,
+tableDesc: CatalogTable): DataWritingCommand
+
+  // Returns `DataWritingCommand` used to write data when the table 
doesn't exist.
+  def writingCommandForNewTable(
+catalog: SessionCatalog,
+tableDesc: CatalogTable): DataWritingCommand
+
   override def argString: String = {
 s"[Database:${tableDesc.database}, " +
 s"TableName: ${tableDesc.identifier.table}, " +
 s"InsertIntoHiveTable]"
   }
 }
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableDesc the table description, which may contain serde, 
storage handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class CreateHiveTableAsSelectCommand(
+tableDesc: CatalogTable,
+query: LogicalPlan,
+outputColumnNames: Seq[String],
+mode: SaveMode)
+  extends CreateHiveTableAsSelectBase {
+
+  override def writingCommandForExistingTable(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable): DataWritingCommand = {
+InsertIntoHiveTable(
+  tableDesc,
+  Map.empty,
--- End diff --

Yes, I've discussed with @cloud-fan. DataFrameWriter allows dynamic 
partition on Hive CTAS. For now seems this syntax is not allowed by 
SparkSqlParser.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23215: [SPARK-26263][SQL] Validate partition values with...

2018-12-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23215#discussion_r239460682
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 ---
@@ -272,9 +279,14 @@ object PartitioningUtils {
   val literal = if (userSpecifiedDataTypes.contains(columnName)) {
 // SPARK-26188: if user provides corresponding column schema, get 
the column value without
 //  inference, and then cast it as user specified data 
type.
-val columnValue = inferPartitionColumnValue(rawColumnValue, false, 
timeZone)
-val castedValue =
-  Cast(columnValue, userSpecifiedDataTypes(columnName), 
Option(timeZone.getID)).eval()
+val dataType = userSpecifiedDataTypes(columnName)
+val columnValueLiteral = inferPartitionColumnValue(rawColumnValue, 
false, timeZone)
+val columnValue = columnValueLiteral.eval()
+val castedValue = Cast(columnValueLiteral, dataType, 
Option(timeZone.getID)).eval()
+if (validatePartitionColumns && columnValue != null && castedValue 
== null) {
+  throw new RuntimeException(s"Failed to cast value `$columnValue` 
to `$dataType` " +
+s"for partition column `$columnName`")
+}
 Literal.create(castedValue, userSpecifiedDataTypes(columnName))
--- End diff --

`Literal.create(castedValue, dataType)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...

2018-12-06 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/22514
  
@cloud-fan I've updated the PR description. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23215: [SPARK-26263][SQL] Validate partition values with user p...

2018-12-06 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23215
  
Sounds good to me too. As there is a config, it is good that we can still 
disable it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23215: [SPARK-26263][SQL] Validate partition values with...

2018-12-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23215#discussion_r239388954
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 ---
@@ -272,9 +279,13 @@ object PartitioningUtils {
   val literal = if (userSpecifiedDataTypes.contains(columnName)) {
 // SPARK-26188: if user provides corresponding column schema, get 
the column value without
 //  inference, and then cast it as user specified data 
type.
-val columnValue = inferPartitionColumnValue(rawColumnValue, false, 
timeZone)
-val castedValue =
-  Cast(columnValue, userSpecifiedDataTypes(columnName), 
Option(timeZone.getID)).eval()
+val dataType = userSpecifiedDataTypes(columnName)
+val columnValueLiteral = inferPartitionColumnValue(rawColumnValue, 
false, timeZone)
+val columnValue = columnValueLiteral.eval()
+val castedValue = Cast(columnValueLiteral, dataType, 
Option(timeZone.getID)).eval()
+if (validatePartitionColumns && columnValue != null && castedValue 
== null) {
+  throw new RuntimeException(s"Failed to cast partition value 
`$columnValue` to $dataType")
--- End diff --

Can we also show `columnName` in this exception message? So it is easier to 
know which partition column has such error.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-12-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r239323943
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand(
 Seq.empty[Row]
   }
 
+  // Returns `DataWritingCommand` used to write data when the table exists.
+  def writingCommandForExistingTable(
+catalog: SessionCatalog,
+tableDesc: CatalogTable): DataWritingCommand
+
+  // Returns `DataWritingCommand` used to write data when the table 
doesn't exist.
+  def writingCommandForNewTable(
+catalog: SessionCatalog,
+tableDesc: CatalogTable): DataWritingCommand
+
   override def argString: String = {
 s"[Database:${tableDesc.database}, " +
 s"TableName: ${tableDesc.identifier.table}, " +
 s"InsertIntoHiveTable]"
   }
 }
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableDesc the table description, which may contain serde, 
storage handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class CreateHiveTableAsSelectCommand(
+tableDesc: CatalogTable,
+query: LogicalPlan,
+outputColumnNames: Seq[String],
+mode: SaveMode)
+  extends CreateHiveTableAsSelectBase {
+
+  override def writingCommandForExistingTable(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable): DataWritingCommand = {
+InsertIntoHiveTable(
+  tableDesc,
+  Map.empty,
--- End diff --


https://github.com/apache/spark/blob/8534d753ecb21ea64ffbaefb5eaca38ba0464c6d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala#L686-L697


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...

2018-12-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23213
  
I think so, don't know if @cloud-fan or @mgaido91 has other opinions?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23236: [SPARK-26275][PYTHON][ML] Increases timeout for Streamin...

2018-12-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23236
  
> Compared to the error values from the test failures above, they match up 
until the 10th batch but then these continue until the 16th where it has a 
timeout

I suspect that might because as the resource usage is heavy, 
`StreamingLogisticRegressionWithSGD`'s training speed on input batch stream 
can't always catch up predict batch stream. So the model doesn't reach expected 
improvement in error yet.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23239: [SPARK-26021][SQL][followup] only deal with NaN a...

2018-12-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23239#discussion_r239302780
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java
 ---
@@ -198,11 +198,45 @@ protected final void writeLong(long offset, long 
value) {
 Platform.putLong(getBuffer(), offset, value);
   }
 
+  // We need to take care of NaN and -0.0 in several places:
+  //   1. When compare values, different NaNs should be treated as same, 
`-0.0` and `0.0` should be
+  //  treated as same.
+  //   2. In range partitioner, different NaNs should belong to the same 
partition, -0.0 and 0.0
--- End diff --

Do we already have related test for case 2?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-12-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r239300131
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand(
 Seq.empty[Row]
   }
 
+  // Returns `DataWritingCommand` used to write data when the table exists.
+  def writingCommandForExistingTable(
+catalog: SessionCatalog,
+tableDesc: CatalogTable): DataWritingCommand
+
+  // Returns `DataWritingCommand` used to write data when the table 
doesn't exist.
+  def writingCommandForNewTable(
+catalog: SessionCatalog,
+tableDesc: CatalogTable): DataWritingCommand
+
   override def argString: String = {
 s"[Database:${tableDesc.database}, " +
 s"TableName: ${tableDesc.identifier.table}, " +
 s"InsertIntoHiveTable]"
   }
 }
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableDesc the table description, which may contain serde, 
storage handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class CreateHiveTableAsSelectCommand(
+tableDesc: CatalogTable,
+query: LogicalPlan,
+outputColumnNames: Seq[String],
+mode: SaveMode)
+  extends CreateHiveTableAsSelectBase {
+
+  override def writingCommandForExistingTable(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable): DataWritingCommand = {
+InsertIntoHiveTable(
+  tableDesc,
+  Map.empty,
--- End diff --

I think it is because we don't allow to have a Hive CTAS statement to 
create a partitioned table. I saw there is one test in hive's SQLQuerySuite for 
it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23236: [SPARK-26275][PYTHON][ML] Increases timeout for Streamin...

2018-12-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23236
  
I agree with @BryanCutler's analysis and it looks a bit weird at few things 
about this test. I also think it is fine to increase the timeout for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as a...

2018-12-05 Thread viirya
Github user viirya closed the pull request at:

https://github.com/apache/spark/pull/23231


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as alias to...

2018-12-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23231
  
Then let me close this now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as alias to...

2018-12-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23231
  
Ok. Maybe we can add few words in ml migration guide to clearly announce 
this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...

2018-12-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23213
  
`wholeStage=false, factoryMode=CODE_ONLY` and `wholeStage=false, 
factoryMode=NO_CODEGEN` should have more complete test coverage for 
`GenerateUnsafeProject`, `GenerateMutableProject`, etc.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as alias to...

2018-12-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23231
  
It is because we have such claim in ml migration guide that said we will 
keep OneHotEncoderEstimator as an alias. I'm fine if we have consensus now that 
we can avoid such alias.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23230: [SPARK-26133][ML][Followup] Fix doc for OneHotEncoder

2018-12-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23230
  
Thanks @HyukjinKwon 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as a...

2018-12-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23231#discussion_r239011539
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala 
---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.apache.spark.annotation.Since
+import org.apache.spark.ml.Estimator
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.util._
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A one-hot encoder that maps a column of category indices to a column of 
binary vectors, with
+ * at most a single one-value per row that indicates the input category 
index.
+ * For example with 5 categories, an input value of 2.0 would map to an 
output vector of
+ * `[0.0, 0.0, 1.0, 0.0]`.
+ * The last category is not included by default (configurable via 
`dropLast`),
+ * because it makes the vector entries sum up to one, and hence linearly 
dependent.
+ * So an input value of 4.0 maps to `[0.0, 0.0, 0.0, 0.0]`.
+ *
+ * @note This is different from scikit-learn's OneHotEncoder, which keeps 
all categories.
+ * The output vectors are sparse.
+ *
+ * When `handleInvalid` is configured to 'keep', an extra "category" 
indicating invalid values is
+ * added as last category. So when `dropLast` is true, invalid values are 
encoded as all-zeros
+ * vector.
+ *
+ * @note When encoding multi-column by using `inputCols` and `outputCols` 
params, input/output cols
+ * come in pairs, specified by the order in the arrays, and each pair is 
treated independently.
+ *
+ * @note `OneHotEncoderEstimator` is renamed to `OneHotEncoder` in 3.0.0. 
This
+ * `OneHotEncoderEstimator` is kept as an alias and will be removed in 
further version.
+ *
+ * @see `StringIndexer` for converting categorical values into category 
indices
+ */
+@Since("2.3.0")
--- End diff --

These since tags are from original OneHotEncoderEstimator.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as alias to...

2018-12-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23231
  
cc @srowen @dbtsai 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as a...

2018-12-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23231#discussion_r239008438
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderEstimatorSuite.scala
 ---
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.apache.spark.ml.attribute.{AttributeGroup, BinaryAttribute, 
NominalAttribute}
+import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
+import org.apache.spark.ml.param.ParamsSuite
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
+import org.apache.spark.sql.{Encoder, Row}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types._
+
+class OneHotEncoderEstimatorSuite extends MLTest with DefaultReadWriteTest 
{
--- End diff --

The fitting of OneHotEncoderEstimator is actually done by OneHotEncoder. 
OneHotEncoderEstimator is just an alias. I'm not sure if we really need to add 
this test suite for it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23231: [SPARK-26273][ML] Add OneHotEncoderEstimator as a...

2018-12-05 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/23231

[SPARK-26273][ML] Add OneHotEncoderEstimator as alias to OneHotEncoder

## What changes were proposed in this pull request?

SPARK-26133 removed deprecated OneHotEncoder and renamed 
OneHotEncoderEstimator to OneHotEncoder.

Based on ml migration doc, we need to keep OneHotEncoderEstimator as an 
alias to OneHotEncoder.

This task is going to add it.

## How was this patch tested?

Added tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 one-hot-encoder-estimator-alias

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23231.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23231


commit 17160710cadc49b54f4385ae3ca9ddb0eb4034b0
Author: Liang-Chi Hsieh 
Date:   2018-12-05T09:27:58Z

Add OneHotEncoderEstimator as alias to OneHotEncoder.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23230: [SPARK-26133][ML][Followup] Fix doc for OneHotEncoder

2018-12-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23230
  
cc @HyukjinKwon @srowen 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23230: [SPARK-26133][ML][Followup] Fix doc for OneHotEnc...

2018-12-05 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/23230

[SPARK-26133][ML][Followup] Fix doc for OneHotEncoder

## What changes were proposed in this pull request?

This fixes doc of renamed OneHotEncoder in PySpark.

## How was this patch tested?

N/A

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 remove_one_hot_encoder_followup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23230.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23230


commit c84886aef9a53d0d58ca4f0f68ece57ee80f88c8
Author: Liang-Chi Hsieh 
Date:   2018-12-05T10:08:01Z

Fix doc for OneHotEncoder.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-12-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r238909363
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -181,62 +180,39 @@ case class RelationConversions(
 conf: SQLConf,
 sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
   private def isConvertible(relation: HiveTableRelation): Boolean = {
-val serde = 
relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
-serde.contains("parquet") && 
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
-  serde.contains("orc") && 
conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
+isConvertible(relation.tableMeta)
   }
 
-  // Return true for Apache ORC and Hive ORC-related configuration names.
-  // Note that Spark doesn't support configurations like 
`hive.merge.orcfile.stripe.level`.
-  private def isOrcProperty(key: String) =
-key.startsWith("orc.") || key.contains(".orc.")
-
-  private def isParquetProperty(key: String) =
-key.startsWith("parquet.") || key.contains(".parquet.")
-
-  private def convert(relation: HiveTableRelation): LogicalRelation = {
-val serde = 
relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
-
-// Consider table and storage properties. For properties existing in 
both sides, storage
-// properties will supersede table properties.
-if (serde.contains("parquet")) {
-  val options = 
relation.tableMeta.properties.filterKeys(isParquetProperty) ++
-relation.tableMeta.storage.properties + 
(ParquetOptions.MERGE_SCHEMA ->
-
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
-  sessionCatalog.metastoreCatalog
-.convertToLogicalRelation(relation, options, 
classOf[ParquetFileFormat], "parquet")
-} else {
-  val options = 
relation.tableMeta.properties.filterKeys(isOrcProperty) ++
-relation.tableMeta.storage.properties
-  if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
-sessionCatalog.metastoreCatalog.convertToLogicalRelation(
-  relation,
-  options,
-  
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
-  "orc")
-  } else {
-sessionCatalog.metastoreCatalog.convertToLogicalRelation(
-  relation,
-  options,
-  classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
-  "orc")
-  }
-}
+  private def isConvertible(tableMeta: CatalogTable): Boolean = {
+val serde = 
tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
+serde.contains("parquet") && 
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
+  serde.contains("orc") && 
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC)
   }
 
+  private val metastoreCatalog = sessionCatalog.metastoreCatalog
+
   override def apply(plan: LogicalPlan): LogicalPlan = {
 plan resolveOperators {
   // Write path
   case InsertIntoTable(r: HiveTableRelation, partition, query, 
overwrite, ifPartitionNotExists)
 // Inserting into partitioned table is not supported in 
Parquet/Orc data source (yet).
   if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
 !r.isPartitioned && isConvertible(r) =>
-InsertIntoTable(convert(r), partition, query, overwrite, 
ifPartitionNotExists)
+InsertIntoTable(metastoreCatalog.convert(r), partition,
+  query, overwrite, ifPartitionNotExists)
 
   // Read path
   case relation: HiveTableRelation
   if DDLUtils.isHiveTable(relation.tableMeta) && 
isConvertible(relation) =>
-convert(relation)
+metastoreCatalog.convert(relation)
+
+  // CTAS
+  case CreateTable(tableDesc, mode, Some(query))
+  if DDLUtils.isHiveTable(tableDesc) && 
tableDesc.partitionColumnNames.isEmpty &&
+isConvertible(tableDesc) =>
--- End diff --

ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-12-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r238902415
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -181,62 +180,39 @@ case class RelationConversions(
 conf: SQLConf,
 sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
   private def isConvertible(relation: HiveTableRelation): Boolean = {
-val serde = 
relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
-serde.contains("parquet") && 
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
-  serde.contains("orc") && 
conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
+isConvertible(relation.tableMeta)
   }
 
-  // Return true for Apache ORC and Hive ORC-related configuration names.
-  // Note that Spark doesn't support configurations like 
`hive.merge.orcfile.stripe.level`.
-  private def isOrcProperty(key: String) =
-key.startsWith("orc.") || key.contains(".orc.")
-
-  private def isParquetProperty(key: String) =
-key.startsWith("parquet.") || key.contains(".parquet.")
-
-  private def convert(relation: HiveTableRelation): LogicalRelation = {
-val serde = 
relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
-
-// Consider table and storage properties. For properties existing in 
both sides, storage
-// properties will supersede table properties.
-if (serde.contains("parquet")) {
-  val options = 
relation.tableMeta.properties.filterKeys(isParquetProperty) ++
-relation.tableMeta.storage.properties + 
(ParquetOptions.MERGE_SCHEMA ->
-
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
-  sessionCatalog.metastoreCatalog
-.convertToLogicalRelation(relation, options, 
classOf[ParquetFileFormat], "parquet")
-} else {
-  val options = 
relation.tableMeta.properties.filterKeys(isOrcProperty) ++
-relation.tableMeta.storage.properties
-  if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
-sessionCatalog.metastoreCatalog.convertToLogicalRelation(
-  relation,
-  options,
-  
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
-  "orc")
-  } else {
-sessionCatalog.metastoreCatalog.convertToLogicalRelation(
-  relation,
-  options,
-  classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
-  "orc")
-  }
-}
+  private def isConvertible(tableMeta: CatalogTable): Boolean = {
+val serde = 
tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
+serde.contains("parquet") && 
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
+  serde.contains("orc") && 
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC)
   }
 
+  private val metastoreCatalog = sessionCatalog.metastoreCatalog
+
   override def apply(plan: LogicalPlan): LogicalPlan = {
 plan resolveOperators {
   // Write path
   case InsertIntoTable(r: HiveTableRelation, partition, query, 
overwrite, ifPartitionNotExists)
 // Inserting into partitioned table is not supported in 
Parquet/Orc data source (yet).
   if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
 !r.isPartitioned && isConvertible(r) =>
-InsertIntoTable(convert(r), partition, query, overwrite, 
ifPartitionNotExists)
+InsertIntoTable(metastoreCatalog.convert(r), partition,
+  query, overwrite, ifPartitionNotExists)
 
   // Read path
   case relation: HiveTableRelation
   if DDLUtils.isHiveTable(relation.tableMeta) && 
isConvertible(relation) =>
-convert(relation)
+metastoreCatalog.convert(relation)
+
+  // CTAS
+  case CreateTable(tableDesc, mode, Some(query))
+  if DDLUtils.isHiveTable(tableDesc) && 
tableDesc.partitionColumnNames.isEmpty &&
+isConvertible(tableDesc) =>
--- End diff --

hmm, the optimization is already controlled by configs like 
`HiveUtils.CONVERT_METASTORE_ORC` and `HiveUtils.CONVERT_METASTORE_PARQUET`. Do 
we need another config for it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-12-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r238707304
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -95,9 +77,127 @@ case class CreateHiveTableAsSelectCommand(
 Seq.empty[Row]
   }
 
+  // Returns `DataWritingCommand` used to write data when the table exists.
+  def writingCommandForExistingTable(
+catalog: SessionCatalog,
+tableDesc: CatalogTable): DataWritingCommand
+
+  // Returns `DataWritingCommand` used to write data when the table 
doesn't exist.
+  def writingCommandForNewTable(
+catalog: SessionCatalog,
+tableDesc: CatalogTable): DataWritingCommand
+
   override def argString: String = {
 s"[Database:${tableDesc.database}, " +
 s"TableName: ${tableDesc.identifier.table}, " +
 s"InsertIntoHiveTable]"
   }
 }
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableDesc the table description, which may contain serde, 
storage handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class CreateHiveTableAsSelectCommand(
+tableDesc: CatalogTable,
+query: LogicalPlan,
+outputColumnNames: Seq[String],
+mode: SaveMode)
+  extends CreateHiveTableAsSelectBase {
+
+  override def writingCommandForExistingTable(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable): DataWritingCommand = {
+InsertIntoHiveTable(
+  tableDesc,
+  Map.empty,
+  query,
+  overwrite = false,
+  ifPartitionNotExists = false,
+  outputColumnNames = outputColumnNames)
+  }
+
+  override def writingCommandForNewTable(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable): DataWritingCommand = {
+// For CTAS, there is no static partition values to insert.
+val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
+InsertIntoHiveTable(
+  tableDesc,
+  partition,
+  query,
+  overwrite = true,
+  ifPartitionNotExists = false,
+  outputColumnNames = outputColumnNames)
+  }
+}
+
+/**
+ * Create table and insert the query result into it. This creates Hive 
table but inserts
+ * the query result into it by using data source.
+ *
+ * @param tableDesc the table description, which may contain serde, 
storage handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class OptimizedCreateHiveTableAsSelectCommand(
+tableDesc: CatalogTable,
+query: LogicalPlan,
+outputColumnNames: Seq[String],
+mode: SaveMode)
+  extends CreateHiveTableAsSelectBase {
+
+  private def getHadoopRelation(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable): HadoopFsRelation = {
+val metastoreCatalog = 
catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
+val hiveTable = DDLUtils.readHiveTable(tableDesc)
+
+metastoreCatalog.convert(hiveTable) match {
+  case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
+  case _ => throw new AnalysisException(s"$tableIdentifier should be 
converted to " +
+"HadoopFsRelation.")
+}
+  }
+
+  override def writingCommandForExistingTable(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable): DataWritingCommand = {
+val hadoopRelation = getHadoopRelation(catalog, tableDesc)
+InsertIntoHadoopFsRelationCommand(
+  hadoopRelation.location.rootPaths.head,
+  Map.empty, // We don't support to convert partitioned table.
+  false,
+  Seq.empty, // We don't support to convert partitioned table.
+  hadoopRelation.bucketSpec,
+  hadoopRelation.fileFormat,
+  hadoopRelation.options,
+  query,
+  mode,
+  Some(tableDesc),
+  Some(hadoopRelation.location),
+  query.output.map(_.name))
+  }
+
+  override def writingCommandForNewTable(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable): DataWritingCommand = {
+val hadoopRelation = getHadoopRelation(catalog, tableDesc)
+InsertIntoHadoopFsRelationCommand(
+  hadoopRelation.location.rootPaths.head,
+  Map.empty, // We don't support to convert partitioned table.
+  false,
+  Seq.empty, // We don't support to convert partitioned table.
+  hadoopRelation.bucketSpec,
+  hadoopRelation.fileFormat,
+  hadoopRelation.options,
+  query,
   

[GitHub] spark pull request #23213: [SPARK-26262][SQL] Run SQLQueryTestSuite with WHO...

2018-12-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23213#discussion_r238696052
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ---
@@ -144,9 +144,10 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
 val (comments, code) = input.split("\n").partition(_.startsWith("--"))
 
 // Runs all the tests on both codegen-only and interpreter modes
-val codegenConfigSets = Array(CODEGEN_ONLY, NO_CODEGEN).map {
-  case codegenFactoryMode =>
-Array(SQLConf.CODEGEN_FACTORY_MODE.key -> 
codegenFactoryMode.toString)
+val codegenConfigSets = Array(("false", "NO_CODEGEN"), ("true", 
"CODEGEN_ONLY")).map {
--- End diff --

will this increase too much test time?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23213: [SPARK-26262][SQL] Run SQLQueryTestSuite with WHO...

2018-12-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23213#discussion_r238695610
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2899,6 +2899,144 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   }
 }
   }
+
+  private def checkKeywordsExistsInExplain(df: DataFrame, keywords: 
String*): Unit = {
+val output = new java.io.ByteArrayOutputStream()
+Console.withOut(output) {
+  df.explain(extended = true)
+}
+val normalizedOutput = output.toString.replaceAll("#\\d+", "#x")
+for (key <- keywords) {
+  assert(normalizedOutput.contains(key))
+}
+  }
+
+  test("optimized plan should show the rewritten aggregate expression") {
--- End diff --

Please update the PR description too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23214: [SPARK-26155] Optimizing the performance of LongToUnsafe...

2018-12-04 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23214
  
Thanks for doing this. I think we are more close to the root cause.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23204: Revert "[SPARK-21052][SQL] Add hash map metrics to join"

2018-12-03 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23204
  
Is this observable in general hash join query, except for TPC-DS Q19?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23204: Revert "[SPARK-21052][SQL] Add hash map metrics t...

2018-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23204#discussion_r238270550
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 ---
@@ -483,8 +470,6 @@ private[execution] final class LongToUnsafeRowMap(val 
mm: TaskMemoryManager, cap
*/
   def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = {
 if (isDense) {
-  numKeyLookups += 1
-  numProbes += 1
--- End diff --

If as your test shows this is the cause of performance regression, we can 
just revert this and related changes. The change in `HashAggregateExec`, etc. 
can be kept.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23203: [SPARK-26252][PYTHON] Add support to run specific unitte...

2018-12-03 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23203
  
Not look closely at the changes yet, but I think it should be very useful. 
Thanks @HyukjinKwon  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23184: [SPARK-26227][R] from_[csv|json] should accept sc...

2018-12-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23184#discussion_r238065068
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
---
@@ -225,4 +225,10 @@ private[sql] object SQLUtils extends Logging {
 }
 sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
   }
+
+  def createArrayType(elementType: DataType): ArrayType = 
DataTypes.createArrayType(elementType)
+
+  def createArrayType(elementType: Column): ArrayType = {
+new ArrayType(ExprUtils.evalTypeExpr(elementType.expr), true)
--- End diff --

oh, I see.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23184: [SPARK-26227][R] from_[csv|json] should accept sc...

2018-11-30 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23184#discussion_r237896514
  
--- Diff: R/pkg/R/functions.R ---
@@ -202,8 +202,9 @@ NULL
 #'  \itemize{
 #'  \item \code{from_json}: a structType object to use as the 
schema to use
 #'  when parsing the JSON string. Since Spark 2.3, the 
DDL-formatted string is
-#'  also supported for the schema.
-#'  \item \code{from_csv}: a DDL-formatted string
+#'  also supported for the schema. Since Spark 3.0, 
\code{schema_of_json} or
+#'  a string literal can also be accepted.
--- End diff --

What `a string literal` means here? Is it different to DDL-formated string?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23184: [SPARK-26227][R] from_[csv|json] should accept sc...

2018-11-30 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23184#discussion_r237898787
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
---
@@ -225,4 +225,10 @@ private[sql] object SQLUtils extends Logging {
 }
 sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
   }
+
+  def createArrayType(elementType: DataType): ArrayType = 
DataTypes.createArrayType(elementType)
+
+  def createArrayType(elementType: Column): ArrayType = {
--- End diff --

`column` instead of `elementType`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23184: [SPARK-26227][R] from_[csv|json] should accept sc...

2018-11-30 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23184#discussion_r237899057
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
---
@@ -225,4 +225,10 @@ private[sql] object SQLUtils extends Logging {
 }
 sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
   }
+
+  def createArrayType(elementType: DataType): ArrayType = 
DataTypes.createArrayType(elementType)
+
+  def createArrayType(elementType: Column): ArrayType = {
+new ArrayType(ExprUtils.evalTypeExpr(elementType.expr), true)
--- End diff --

Instead of `true`, can we use `elementType.expr.nullable`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...

2018-11-30 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22957#discussion_r237818279
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -195,14 +195,35 @@ abstract class Expression extends 
TreeNode[Expression] {
   }
 
   /**
-   * Returns true when two expressions will always compute the same 
result, even if they differ
+   * Returns true when two expressions will always compute the same 
output, even if they differ
* cosmetically (i.e. capitalization of names in attributes may be 
different).
*
* See [[Canonicalize]] for more details.
+   *
+   * This method should be used (instead of `sameResult`) when comparing 
if 2 expressions are the
+   * same and one can replace the other (eg. in Optimizer/Analyzer rules 
where we want to replace
+   * equivalent expressions). It should not be used (and `sameResult` 
should be used instead) when
+   * comparing if 2 expressions produce the same results (in this case 
`semanticEquals` can be too
+   * strict).
*/
   def semanticEquals(other: Expression): Boolean =
 deterministic && other.deterministic && canonicalized == 
other.canonicalized
 
+  /**
+   * Returns true when two expressions will always compute the same 
result, even if the output may
+   * be different, because of different names or similar differences.
--- End diff --

How about replace `output` with `output from plan perspective`?

```
Returns true when two expressions will always compute the same result, even 
if the output
from plan perspective may be different, because of different names or 
similar differences.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...

2018-11-30 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/22514
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...

2018-11-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23152#discussion_r237768463
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala 
---
@@ -2276,4 +2276,16 @@ class SQLQuerySuite extends QueryTest with 
SQLTestUtils with TestHiveSingleton {
 }
   }
 
+
+  test("SPARK-26181 hasMinMaxStats method of ColumnStatsMap is not 
correct") {
+withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
+  withTable("all_null") {
+sql("create table all_null (attrInt int)")
+sql("insert into all_null values (null)")
+sql("analyze table all_null compute statistics for columns 
attrInt")
+checkAnswer(sql("select * from all_null where attrInt < 1"), Nil)
--- End diff --

This test can pass without this patch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-11-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r237749421
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala ---
@@ -92,4 +92,18 @@ class HiveParquetSuite extends QueryTest with 
ParquetTest with TestHiveSingleton
   }
 }
   }
+
+  test("SPARK-25271: write empty map into hive parquet table") {
--- End diff --

Added a new test for that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22957: [SPARK-25951][SQL] Ignore aliases for distributions and ...

2018-11-29 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/22957
  
Btw, I think we can update the PR title and description to reflect new 
changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...

2018-11-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22957#discussion_r237749287
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---
@@ -780,6 +780,23 @@ class PlannerSuite extends SharedSQLContext {
 classOf[PartitioningCollection])
 }
   }
+
+  test("SPARK-25951: avoid redundant shuffle on rename") {
--- End diff --

+1 if possible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22957: [SPARK-25951][SQL] Ignore aliases for distributions and ...

2018-11-29 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/22957
  
This looks good to me. Just a comment about wording.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...

2018-11-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22957#discussion_r237747550
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -195,14 +195,35 @@ abstract class Expression extends 
TreeNode[Expression] {
   }
 
   /**
-   * Returns true when two expressions will always compute the same 
result, even if they differ
+   * Returns true when two expressions will always compute the same 
output, even if they differ
* cosmetically (i.e. capitalization of names in attributes may be 
different).
*
* See [[Canonicalize]] for more details.
+   *
+   * This method should be used (instead of `sameResult`) when comparing 
if 2 expressions are the
+   * same and one can replace the other (eg. in Optimizer/Analyzer rules 
where we want to replace
+   * equivalent expressions). It should not be used (and `sameResult` 
should be used instead) when
+   * comparing if 2 expressions produce the same results (in this case 
`semanticEquals` can be too
+   * strict).
*/
   def semanticEquals(other: Expression): Boolean =
 deterministic && other.deterministic && canonicalized == 
other.canonicalized
 
+  /**
+   * Returns true when two expressions will always compute the same 
result, even if the output may
+   * be different, because of different names or similar differences.
--- End diff --

I think here `output` is a bit confusing. Do we mean the output names?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...

2018-11-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22957#discussion_r237747770
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -195,14 +195,35 @@ abstract class Expression extends 
TreeNode[Expression] {
   }
 
   /**
-   * Returns true when two expressions will always compute the same 
result, even if they differ
+   * Returns true when two expressions will always compute the same 
output, even if they differ
* cosmetically (i.e. capitalization of names in attributes may be 
different).
*
* See [[Canonicalize]] for more details.
+   *
+   * This method should be used (instead of `sameResult`) when comparing 
if 2 expressions are the
+   * same and one can replace the other (eg. in Optimizer/Analyzer rules 
where we want to replace
+   * equivalent expressions). It should not be used (and `sameResult` 
should be used instead) when
+   * comparing if 2 expressions produce the same results (in this case 
`semanticEquals` can be too
+   * strict).
*/
   def semanticEquals(other: Expression): Boolean =
 deterministic && other.deterministic && canonicalized == 
other.canonicalized
 
+  /**
+   * Returns true when two expressions will always compute the same 
result, even if the output may
+   * be different, because of different names or similar differences.
--- End diff --

So sameResult returns if the evaluated results between two expressions are 
exactly the same?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-11-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r237747152
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -181,62 +180,39 @@ case class RelationConversions(
 conf: SQLConf,
 sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
   private def isConvertible(relation: HiveTableRelation): Boolean = {
-val serde = 
relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
-serde.contains("parquet") && 
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
-  serde.contains("orc") && 
conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
+isConvertible(relation.tableMeta)
   }
 
-  // Return true for Apache ORC and Hive ORC-related configuration names.
-  // Note that Spark doesn't support configurations like 
`hive.merge.orcfile.stripe.level`.
-  private def isOrcProperty(key: String) =
-key.startsWith("orc.") || key.contains(".orc.")
-
-  private def isParquetProperty(key: String) =
-key.startsWith("parquet.") || key.contains(".parquet.")
-
-  private def convert(relation: HiveTableRelation): LogicalRelation = {
-val serde = 
relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
-
-// Consider table and storage properties. For properties existing in 
both sides, storage
-// properties will supersede table properties.
-if (serde.contains("parquet")) {
-  val options = 
relation.tableMeta.properties.filterKeys(isParquetProperty) ++
-relation.tableMeta.storage.properties + 
(ParquetOptions.MERGE_SCHEMA ->
-
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
-  sessionCatalog.metastoreCatalog
-.convertToLogicalRelation(relation, options, 
classOf[ParquetFileFormat], "parquet")
-} else {
-  val options = 
relation.tableMeta.properties.filterKeys(isOrcProperty) ++
-relation.tableMeta.storage.properties
-  if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
-sessionCatalog.metastoreCatalog.convertToLogicalRelation(
-  relation,
-  options,
-  
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
-  "orc")
-  } else {
-sessionCatalog.metastoreCatalog.convertToLogicalRelation(
-  relation,
-  options,
-  classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
-  "orc")
-  }
-}
+  private def isConvertible(tableMeta: CatalogTable): Boolean = {
+val serde = 
tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
+serde.contains("parquet") && 
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
+  serde.contains("orc") && 
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC)
   }
 
+  private val metastoreCatalog = sessionCatalog.metastoreCatalog
+
   override def apply(plan: LogicalPlan): LogicalPlan = {
 plan resolveOperators {
   // Write path
   case InsertIntoTable(r: HiveTableRelation, partition, query, 
overwrite, ifPartitionNotExists)
 // Inserting into partitioned table is not supported in 
Parquet/Orc data source (yet).
   if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
 !r.isPartitioned && isConvertible(r) =>
-InsertIntoTable(convert(r), partition, query, overwrite, 
ifPartitionNotExists)
+InsertIntoTable(metastoreCatalog.convert(r), partition,
+  query, overwrite, ifPartitionNotExists)
 
   // Read path
   case relation: HiveTableRelation
   if DDLUtils.isHiveTable(relation.tableMeta) && 
isConvertible(relation) =>
-convert(relation)
+metastoreCatalog.convert(relation)
+
+  // CTAS
+  case CreateTable(tableDesc, mode, Some(query))
+  if DDLUtils.isHiveTable(tableDesc) && 
tableDesc.partitionColumnNames.isEmpty &&
+isConvertible(tableDesc) =>
+DDLUtils.checkDataColNames(tableDesc)
--- End diff --

In HiveAnalysis, when transforming CreateTable to 
CreateHiveTableAsSelectCommand, it has this too. checkDataColNames checks if 
any invalid character is using in column name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...

2018-11-29 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/22514
  
https://user-images.githubusercontent.com/68855/49268483-aaa6d000-f49a-11e8-92c3-5ee78012fe9e.png;>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...

2018-11-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23152#discussion_r237721273
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -879,13 +879,13 @@ case class ColumnStatsMap(originalMap: 
AttributeMap[ColumnStat]) {
   }
 
   def hasCountStats(a: Attribute): Boolean =
-get(a).map(_.hasCountStats).getOrElse(false)
+get(a).exists(_.hasCountStats)
 
   def hasDistinctCount(a: Attribute): Boolean =
-get(a).map(_.distinctCount.isDefined).getOrElse(false)
+get(a).exists(_.distinctCount.isDefined)
 
   def hasMinMaxStats(a: Attribute): Boolean =
-get(a).map(_.hasCountStats).getOrElse(false)
+get(a).exists(_.hasMinMaxStats)
--- End diff --

@liancheng Thanks. @adrian-wang Can you also add a test based on that?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...

2018-11-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23152#discussion_r237720737
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -879,13 +879,13 @@ case class ColumnStatsMap(originalMap: 
AttributeMap[ColumnStat]) {
   }
 
   def hasCountStats(a: Attribute): Boolean =
-get(a).map(_.hasCountStats).getOrElse(false)
+get(a).exists(_.hasCountStats)
 
   def hasDistinctCount(a: Attribute): Boolean =
-get(a).map(_.distinctCount.isDefined).getOrElse(false)
+get(a).exists(_.distinctCount.isDefined)
 
   def hasMinMaxStats(a: Attribute): Boolean =
-get(a).map(_.hasCountStats).getOrElse(false)
+get(a).exists(_.hasMinMaxStats)
--- End diff --

So this is still an actual bug.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...

2018-11-29 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/23152
  
Agreed. But looks like the added test was failed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts,...

2018-11-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23171#discussion_r237402055
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 ---
@@ -335,6 +343,41 @@ case class In(value: Expression, list: 
Seq[Expression]) extends Predicate {
""".stripMargin)
   }
 
+  private def genCodeWithSwitch(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+val (nullLiterals, nonNullLiterals) = list.partition {
+  case Literal(null, _) => true
+  case _ => false
+}
+val listGen = nonNullLiterals.map(_.genCode(ctx))
+val valueGen = value.genCode(ctx)
+
+val caseBranches = listGen.map(literal =>
--- End diff --

style:

```scala
listGen.map { literal =>
  ...
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts,...

2018-11-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23171#discussion_r237405465
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 ---
@@ -335,6 +343,41 @@ case class In(value: Expression, list: 
Seq[Expression]) extends Predicate {
""".stripMargin)
   }
 
+  private def genCodeWithSwitch(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+val (nullLiterals, nonNullLiterals) = list.partition {
+  case Literal(null, _) => true
+  case _ => false
+}
+val listGen = nonNullLiterals.map(_.genCode(ctx))
+val valueGen = value.genCode(ctx)
+
+val caseBranches = listGen.map(literal =>
+  s"""
+ |case ${literal.value}:
+ |  ${ev.value} = true;
+ |  break;
+   """.stripMargin)
+
+ev.copy(code =
+  code"""
+ |${valueGen.code}
+ |${CodeGenerator.JAVA_BOOLEAN} ${ev.isNull} = ${valueGen.isNull};
+ |${CodeGenerator.JAVA_BOOLEAN} ${ev.value} = false;
+ |if (!${valueGen.isNull}) {
+ |  switch (${valueGen.value}) {
+ |${caseBranches.mkString("")}
+ |default:
+ |  ${ev.isNull} = ${nullLiterals.nonEmpty};
+ |  }
+ |}
+   """.stripMargin)
+  }
+
+  private def isSwitchCompatible: Boolean = list.forall {
+case Literal(_, dt) => dt == ByteType || dt == ShortType || dt == 
IntegerType
--- End diff --

Can be simplified to?

```scala
private def isSwitchCompatible: Boolean = {
  inSetConvertible && (value.dataType == ByteType || value.dataType == 
ShortType || value.dataType == IntegerType)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...

2018-11-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23176#discussion_r237398085
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 ---
@@ -367,11 +367,29 @@ case class InSet(child: Expression, hset: Set[Any]) 
extends UnaryExpression with
   }
 
   @transient lazy val set: Set[Any] = child.dataType match {
-case _: AtomicType => hset
+case t: AtomicType if !t.isInstanceOf[BinaryType] => hset
 case _: NullType => hset
 case _ =>
+  val ord = TypeUtils.getInterpretedOrdering(child.dataType)
+  val ordering = if (hasNull) {
+new Ordering[Any] {
+  override def compare(x: Any, y: Any): Int = {
--- End diff --

Or simply filter out null from the tree set as @cloud-fan's idea.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...

2018-11-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23176#discussion_r237397442
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 ---
@@ -367,11 +367,29 @@ case class InSet(child: Expression, hset: Set[Any]) 
extends UnaryExpression with
   }
 
   @transient lazy val set: Set[Any] = child.dataType match {
-case _: AtomicType => hset
+case t: AtomicType if !t.isInstanceOf[BinaryType] => hset
 case _: NullType => hset
 case _ =>
+  val ord = TypeUtils.getInterpretedOrdering(child.dataType)
+  val ordering = if (hasNull) {
+new Ordering[Any] {
+  override def compare(x: Any, y: Any): Int = {
--- End diff --

InSet overrides nullSafeEval, and for codegen we look into `set` only if 
`!ev.isNull`, so I think we only need to consider the case one side is null.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...

2018-11-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23152#discussion_r237382531
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -879,13 +879,13 @@ case class ColumnStatsMap(originalMap: 
AttributeMap[ColumnStat]) {
   }
 
   def hasCountStats(a: Attribute): Boolean =
-get(a).map(_.hasCountStats).getOrElse(false)
+get(a).exists(_.hasCountStats)
 
   def hasDistinctCount(a: Attribute): Boolean =
-get(a).map(_.distinctCount.isDefined).getOrElse(false)
+get(a).exists(_.distinctCount.isDefined)
 
   def hasMinMaxStats(a: Attribute): Boolean =
-get(a).map(_.hasCountStats).getOrElse(false)
+get(a).exists(_.hasMinMaxStats)
--- End diff --

Based on https://github.com/apache/spark/pull/23152#discussion_r237359357, 
the old `hasMinMaxStats` actually work correctly although it looks not at a 
glance.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...

2018-11-28 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23152#discussion_r237381966
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -821,6 +822,32 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
   expectedRowCount = 3)
   }
 
+  test("ColumnStatsMap tests") {
--- End diff --

Oh I see. So `FilterEstimation` works well without this change. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...

2018-11-28 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/22514
  
> can we try a query and see what the SQL UI looks like?

Yes. I will try a query and post the SQL UI.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-11-28 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r237364946
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala ---
@@ -92,4 +92,18 @@ class HiveParquetSuite extends QueryTest with 
ParquetTest with TestHiveSingleton
   }
 }
   }
+
+  test("SPARK-25271: write empty map into hive parquet table") {
--- End diff --

I agreed. Now because we have two Hive CTAS commands, it is easier to test 
it. Will add tests later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...

2018-11-28 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23152#discussion_r237359890
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -821,6 +822,32 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
   expectedRowCount = 3)
   }
 
+  test("ColumnStatsMap tests") {
--- End diff --

If `hasMinMaxStats` is not correct previously, will `evaluateBinary` work 
correctly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...

2018-11-28 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23152#discussion_r237347496
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -821,6 +822,32 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
   expectedRowCount = 3)
   }
 
+  test("ColumnStatsMap tests") {
--- End diff --

Since this is not correct, I think that FilterEstimation didn't work 
previously. Can you also add a related test using validateEstimatedStats 
similar to other tests? In the test, we should rely on min and max stats to do 
filter estimation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...

2018-11-28 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/22514
  
Yea, lets see if retest works well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >