[GitHub] spark pull request #23275: [SPARK-26323][SQL] Scala UDF should still check i...

2018-12-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23275#discussion_r240234583
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
 ---
@@ -88,68 +88,49 @@ sealed trait UserDefinedFunction {
 private[sql] case class SparkUserDefinedFunction(
 f: AnyRef,
 dataType: DataType,
-inputTypes: Option[Seq[DataType]],
-nullableTypes: Option[Seq[Boolean]],
+inputSchemas: Seq[Option[ScalaReflection.Schema]],
 name: Option[String] = None,
 nullable: Boolean = true,
 deterministic: Boolean = true) extends UserDefinedFunction {
 
   @scala.annotation.varargs
-  override def apply(exprs: Column*): Column = {
-// TODO: make sure this class is only instantiated through 
`SparkUserDefinedFunction.create()`
-// and `nullableTypes` is always set.
-if (inputTypes.isDefined) {
-  assert(inputTypes.get.length == nullableTypes.get.length)
-}
-
-val inputsNullSafe = nullableTypes.getOrElse {
-  ScalaReflection.getParameterTypeNullability(f)
--- End diff --

Not worth to keep it anymore, as Scala 2.12 is the default now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23228: [MINOR][DOC] Update the condition description of seriali...

2018-12-10 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23228
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23275: [SPARK-26323][SQL] Scala UDF should still check i...

2018-12-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23275#discussion_r240231883
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
@@ -4255,11 +4255,11 @@ object functions {
*
* @group udf_funcs
* @since 2.0.0
+   *
+   * @deprecated("please use the typed `udf` methods", "3.0.0")
--- End diff --

with Scala 2.12, type and nullability info need to be retrieved during 
compile time, this method is not very useful and we should deprecate it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23275: [SPARK-26323][SQL] Scala UDF should still check input ty...

2018-12-10 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23275
  
cc @maryannxue @gatorsmile @srowen 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23275: [SPARK-26323][SQL] Scala UDF should still check i...

2018-12-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23275#discussion_r240230970
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
 ---
@@ -47,25 +47,13 @@ case class ScalaUDF(
 function: AnyRef,
 dataType: DataType,
 children: Seq[Expression],
-inputsNullSafe: Seq[Boolean],
-inputTypes: Seq[DataType] = Nil,
+@transient inputsNullSafe: Seq[Boolean],
+@transient inputTypes: Seq[AbstractDataType] = Nil,
 udfName: Option[String] = None,
 nullable: Boolean = true,
 udfDeterministic: Boolean = true)
   extends Expression with ImplicitCastInputTypes with NonSQLExpression 
with UserDefinedExpression {
 
-  // The constructor for SPARK 2.1 and 2.2
--- End diff --

not useful anymore in Spark 3.0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23275: [SPARK-26323][SQL] Scala UDF should still check i...

2018-12-10 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/23275

[SPARK-26323][SQL] Scala UDF should still check input types even if some 
inputs are of type Any

## What changes were proposed in this pull request?

For Scala UDF, when checking input nullability, we will skip inputs with 
type `Any`, and only check the inputs that provide nullability info.

We should do the same for checking input types.

## How was this patch tested?

new tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark udf

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23275.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23275


commit 8582607195f12a4c133fb28b59e8a7fce7a97fbb
Author: Wenchen Fan 
Date:   2018-12-10T13:00:17Z

Scala UDF should still check input types even if some inputs are of type Any




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...

2018-12-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23272#discussion_r240189245
  
--- Diff: 
core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
 ---
@@ -667,4 +669,54 @@ public void testPeakMemoryUsed() {
 }
   }
 
+  @Test
+  public void avoidDeadlock() throws InterruptedException {
+memoryManager.limit(PAGE_SIZE_BYTES);
+MemoryMode mode = useOffHeapMemoryAllocator() ? MemoryMode.OFF_HEAP: 
MemoryMode.ON_HEAP;
+TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager, 
mode);
+BytesToBytesMap map =
+  new BytesToBytesMap(taskMemoryManager, blockManager, 
serializerManager, 1, 0.5, 1024);
+
+Runnable memoryConsumer = new Runnable() {
+  @Override
+  public void run() {
+int i = 0;
+long used = 0;
+while (i < 10) {
+  c1.use(1000);
+  used += 1000;
+  i++;
+}
+c1.free(used);
+  }
+};
+
+Thread thread = new Thread(memoryConsumer);
+
+try {
+  int i;
+  for (i = 0; i < 1024; i++) {
+final long[] arr = new long[]{i};
+final BytesToBytesMap.Location loc = map.lookup(arr, 
Platform.LONG_ARRAY_OFFSET, 8);
+loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, 
Platform.LONG_ARRAY_OFFSET, 8);
+  }
+
+  // Starts to require memory at another memory consumer.
+  thread.start();
+
+  BytesToBytesMap.MapIterator iter = map.destructiveIterator();
+  for (i = 0; i < 1024; i++) {
+iter.next();
+  }
+  assertFalse(iter.hasNext());
+} finally {
+  map.free();
+  thread.join();
--- End diff --

Is this line where the test hangs without the fix?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23251: [SPARK-26300][SS] Remove a redundant `checkForStreaming`...

2018-12-10 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23251
  
cc @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23262: [SPARK-26312][SQL]Converting converters in RDDConversion...

2018-12-10 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23262
  
LGTM, can you update the PR title and description?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...

2018-12-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23262#discussion_r240180713
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ---
@@ -416,7 +416,12 @@ case class DataSourceStrategy(conf: SQLConf) extends 
Strategy with Logging with
   output: Seq[Attribute],
   rdd: RDD[Row]): RDD[InternalRow] = {
 if (relation.relation.needConversion) {
-  execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
+  val converters = RowEncoder(StructType.fromAttributes(output))
+  rdd.mapPartitions { iterator =>
+iterator.map { r =>
--- End diff --

nit: `iterator.map(converters.toRow)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesMap.MapI...

2018-12-10 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23272
  
have you seen any bug report caused by this dead lock?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...

2018-12-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23272#discussion_r240178993
  
--- Diff: 
core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java ---
@@ -38,12 +38,14 @@ public long spill(long size, MemoryConsumer trigger) 
throws IOException {
 return used;
   }
 
-  void use(long size) {
+  @VisibleForTesting
--- End diff --

This is a test class, we can just make it public.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23204: Revert "[SPARK-21052][SQL] Add hash map metrics t...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23204#discussion_r240104812
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala ---
@@ -213,10 +213,6 @@ trait HashJoin {
   s"BroadcastHashJoin should not take $x as the JoinType")
 }
 
-// At the end of the task, we update the avg hash probe.
-TaskContext.get().addTaskCompletionListener[Unit](_ =>
--- End diff --

in this file, the `join` method takes `avgHashProbe: SQLMetric`, we should 
remove it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23255: [SPARK-26307] [SQL] Fix CTAS when INSERT a partitioned t...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23255
  
thanks, merging to master/2.4/2.3!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23211: [SPARK-19712][SQL] Move PullupCorrelatedPredicates and R...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23211
  
to make the PR smaller, can we add an individual rule 
`PushdownLeftSemiOrAntiJoin` first?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23211: [SPARK-19712][SQL] Move PullupCorrelatedPredicate...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23211#discussion_r240097479
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -984,6 +1002,28 @@ object PushDownPredicate extends Rule[LogicalPlan] 
with PredicateHelper {
 
   project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
 
+// Similar to the above Filter over Project
+// LeftSemi/LeftAnti over Project
+case join @ Join(p @ Project(pList, grandChild), rightOp, 
LeftSemiOrAnti(joinType), joinCond)
--- End diff --

Shall we create a new rule `PushdownLeftSemaOrAntiJoin`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23211: [SPARK-19712][SQL] Move PullupCorrelatedPredicate...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23211#discussion_r240097255
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -649,13 +664,16 @@ object CollapseProject extends Rule[LogicalPlan] {
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
 case p1 @ Project(_, p2: Project) =>
-  if (haveCommonNonDeterministicOutput(p1.projectList, 
p2.projectList)) {
+  if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) 
||
+ScalarSubquery.hasScalarSubquery(p1.projectList) ||
+ScalarSubquery.hasScalarSubquery(p2.projectList)) {
--- End diff --

why do we allow it before?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23204: Revert "[SPARK-21052][SQL] Add hash map metrics to join"

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23204
  
can we follow 
https://github.com/apache/spark/pull/23204#issuecomment-445510026 and create a 
new ticket?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23211: [SPARK-19712][SQL] Move PullupCorrelatedPredicate...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23211#discussion_r240092936
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
@@ -267,6 +267,17 @@ object ScalarSubquery {
   case _ => false
 }.isDefined
   }
+
+  def hasScalarSubquery(e: Expression): Boolean = {
+e.find {
+  case s: ScalarSubquery => true
+  case _ => false
+}.isDefined
+  }
+
+  def hasScalarSubquery(e: Seq[Expression]): Boolean = {
+e.find(hasScalarSubquery(_)).isDefined
--- End diff --

`e.exists(hasScalarSubquery)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23248: [SPARK-26293][SQL] Cast exception when having python udf...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23248
  
If it's fine for 2.4, I think it's also fine for master as a temporary fix? 
We can create another ticket to clean up the subquery optimization hack. IIUC 
https://github.com/apache/spark/pull/23211 may help with it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23258: [SPARK-23375][SQL][FOLLOWUP][TEST] Test Sort metr...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23258#discussion_r240090371
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -182,10 +182,13 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   }
 
   test("Sort metrics") {
-// Assume the execution plan is
-// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1))
-val ds = spark.range(10).sort('id)
-testSparkPlanMetrics(ds.toDF(), 2, Map.empty)
+// Assume the execution plan with node id is
+// Sort(nodeId = 0)
+//   Exchange(nodeId = 1)
+// LocalTableScan(nodeId = 2)
+val df = Seq(1, 3, 2).toDF("id").sort('id)
+testSparkPlanMetrics(df, 2, Map.empty)
--- End diff --

can we just check something like `sortTime  > 0`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23201: [SPARK-26246][SQL] Infer date and timestamp types...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23201#discussion_r240090192
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -121,7 +122,26 @@ private[sql] class JsonInferSchema(options: 
JSONOptions) extends Serializable {
 DecimalType(bigDecimal.precision, bigDecimal.scale)
 }
 decimalTry.getOrElse(StringType)
-  case VALUE_STRING => StringType
+  case VALUE_STRING =>
+val stringValue = parser.getText
--- End diff --

the partition feature is shared between all the file-based sources, I think 
it's an overkill to make it differ with different data sources.

The simplest solution to me is asking all text sources to follow the 
behavior of partition value type inference.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23265: [2.4][SPARK-26021][SQL][FOLLOWUP] only deal with ...

2018-12-09 Thread cloud-fan
Github user cloud-fan closed the pull request at:

https://github.com/apache/spark/pull/23265


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23228: [MINOR][DOC] Update the condition description of seriali...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23228
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23204: Revert "[SPARK-21052][SQL] Add hash map metrics to join"

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23204
  
If we can quickly finish #23214 (within several days), let's go for it. But 
if we can't, I'd suggest we do the partial revert first to fix the perf 
regression, and add back the metrics later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23228: [MINOR][DOC]The condition description of serialized shuf...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23228
  
LGTM, cc @jiangxb1987 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23228: [MINOR][DOC]The condition description of serializ...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23228#discussion_r240036698
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala ---
@@ -33,10 +33,10 @@ import org.apache.spark.shuffle._
  * Sort-based shuffle has two different write paths for producing its map 
output files:
  *
  *  - Serialized sorting: used when all three of the following conditions 
hold:
- *1. The shuffle dependency specifies no aggregation or output 
ordering.
+ *1. The shuffle dependency specifies no map-side combine.
--- End diff --

looks right to me, according to 
https://github.com/apache/spark/blob/d5dadbf30d5429c36ec3d5c2845a71c2717fd6f3/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala#L195


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23253: [SPARK-26303][SQL] Return partial results for bad JSON r...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23253
  
LGTM except a code style comment


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23253#discussion_r240036498
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -347,17 +347,28 @@ class JacksonParser(
   schema: StructType,
   fieldConverters: Array[ValueConverter]): InternalRow = {
 val row = new GenericInternalRow(schema.length)
+var badRecordException: Option[Throwable] = None
+
 while (nextUntil(parser, JsonToken.END_OBJECT)) {
   schema.getFieldIndex(parser.getCurrentName) match {
 case Some(index) =>
-  row.update(index, fieldConverters(index).apply(parser))
-
+  try {
+row.update(index, fieldConverters(index).apply(parser))
+  } catch {
+case NonFatal(e) =>
+  badRecordException = badRecordException.orElse(Some(e))
+  parser.skipChildren()
+  }
 case None =>
   parser.skipChildren()
   }
 }
 
-row
+if (badRecordException.isEmpty) {
+  row
+} else {
+  throw BadRecordException(() => UTF8String.EMPTY_UTF8, () => 
Some(row), badRecordException.get)
--- End diff --

or we can create a new exception type and use it here, which only carries 
the row and the exception


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23253#discussion_r240036489
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -347,17 +347,28 @@ class JacksonParser(
   schema: StructType,
   fieldConverters: Array[ValueConverter]): InternalRow = {
 val row = new GenericInternalRow(schema.length)
+var badRecordException: Option[Throwable] = None
+
 while (nextUntil(parser, JsonToken.END_OBJECT)) {
   schema.getFieldIndex(parser.getCurrentName) match {
 case Some(index) =>
-  row.update(index, fieldConverters(index).apply(parser))
-
+  try {
+row.update(index, fieldConverters(index).apply(parser))
+  } catch {
+case NonFatal(e) =>
+  badRecordException = badRecordException.orElse(Some(e))
+  parser.skipChildren()
+  }
 case None =>
   parser.skipChildren()
   }
 }
 
-row
+if (badRecordException.isEmpty) {
+  row
+} else {
+  throw BadRecordException(() => UTF8String.EMPTY_UTF8, () => 
Some(row), badRecordException.get)
--- End diff --

add a comment to say that, we don't know the original record here, and it 
will be filled later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23201: [SPARK-26246][SQL] Infer date and timestamp types...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23201#discussion_r240036225
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -121,7 +122,26 @@ private[sql] class JsonInferSchema(options: 
JSONOptions) extends Serializable {
 DecimalType(bigDecimal.precision, bigDecimal.scale)
 }
 decimalTry.getOrElse(StringType)
-  case VALUE_STRING => StringType
+  case VALUE_STRING =>
+val stringValue = parser.getText
--- End diff --

do you mean partition value type inference will have a different result 
than json value type inference?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23208
  
Let's move the high level discussion to the doc: 
https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23266: [SPARK-26313][SQL] move read related methods from...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23266#discussion_r240029373
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java 
---
@@ -20,14 +20,27 @@
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.sources.v2.reader.Scan;
 import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
 
 /**
- * An empty mix-in interface for {@link Table}, to indicate this table 
supports batch scan.
- * 
- * If a {@link Table} implements this interface, its {@link 
Table#newScanBuilder(DataSourceOptions)}
- * must return a {@link ScanBuilder} that builds {@link Scan} with {@link 
Scan#toBatch()}
- * implemented.
- * 
+ * A mix-in interface for {@link Table} to provide data reading ability of 
batch processing.
  */
 @Evolving
-public interface SupportsBatchRead extends Table { }
+public interface SupportsBatchRead extends Table {
+
+  /**
+   * Returns the schema of this table.
+   */
+  StructType schema();
--- End diff --

I'm not sure about this. Maybe it's ok to leave `schema` in `Table`, and 
asks write-only table to report schema as empty.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r240028574
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,52 +17,49 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.util.UUID
-
-import scala.collection.JavaConverters._
+import java.util.{Optional, UUID}
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
 import org.apache.spark.sql.types.StructType
 
 /**
- * A logical plan representing a data source v2 scan.
+ * A logical plan representing a data source v2 table.
  *
- * @param source An instance of a [[DataSourceV2]] implementation.
- * @param options The options for this scan. Used to create fresh 
[[BatchWriteSupport]].
- * @param userSpecifiedSchema The user-specified schema for this scan.
+ * @param table The table that this relation represents.
+ * @param options The options for this table operation. It's used to 
create fresh [[ScanBuilder]]
+ *and [[BatchWriteSupport]].
  */
 case class DataSourceV2Relation(
-// TODO: remove `source` when we finish API refactor for write.
-source: TableProvider,
-table: SupportsBatchRead,
+table: Table,
 output: Seq[AttributeReference],
-options: Map[String, String],
-userSpecifiedSchema: Option[StructType] = None)
+// TODO: use a simple case insensitive map instead.
+options: DataSourceOptions)
--- End diff --

It was done it multiple places before:

https://github.com/apache/spark/pull/23208/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23L62

https://github.com/apache/spark/pull/23208/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23L153

https://github.com/apache/spark/pull/23208/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23L171

If you prefer it strongly, I can follow it and update the code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r240028515
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -25,7 +25,10 @@
  * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
  * have a public, 0-arg constructor.
  * 
- * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write. If you
+ * want to allow end-users to write data to non-existing tables via write 
APIs in `DataFrameWriter`
+ * with `SaveMode`, you must return a {@link Table} instance even if the 
table doesn't exist. The
+ * table schema can be empty in this case.
--- End diff --

I'm not convinced it's safe to remove `SaveMode` right away, when there is 
only an `Append` operator implemented currently.

If we do it, it means `DataFrameWriter.save` need to throw an exception for 
a lot of cases, except when the `mode` is append. I don't think this is 
acceptable right now.

Can we discuss the removal of `SaveMode` at least after all the necessary 
new write operators are implemented?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23266: [SPARK-26313][SQL] move read related methods from...

2018-12-09 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/23266

[SPARK-26313][SQL] move read related methods from Table to read related 
mix-in traits

## What changes were proposed in this pull request?

As discussed in https://github.com/apache/spark/pull/23208/files#r239684490 
, we should put read related methods in read related mix-in traits like 
`SupportsBatchRead`, to support write-only table.

In the `Append` operator, we should skip schema validation if the table is 
write-only. This will be done when we finish the write API refactor in 
https://github.com/apache/spark/pull/23208/

## How was this patch tested?

existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark ds-read

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23266.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23266


commit da520cc6e7daa36c7d7fabdb0a08d4b4341250b9
Author: Wenchen Fan 
Date:   2018-12-09T08:32:51Z

move read related methods from Table to read related mix-in traits




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23266: [SPARK-26313][SQL] move read related methods from Table ...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23266
  
cc @rdblue @HyukjinKwon @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23265: [2.4][SPARK-26021][SQL][FOLLOWUP] only deal with NaN and...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23265
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23259: [SPARK-26215][SQL][WIP] Define reserved/non-reserved key...

2018-12-09 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23259
  
thanks @maropu for starting it!

> Which SQL standard does Spark SQL follow (e.g., 2011 or 2016)?
I think SQL 2011 is good, but if we can't find a public version, maybe it's 
also OK to follow 
[postgres](https://github.com/postgres/postgres/blob/ee2b37ae044f34851baba69e9ba737077326414e/src/backend/parser/gram.y#L15366)

> Where should we hanlde reserved key words?
I think it should be `SqlBase.g4`, but a problem is, the g4 files defines 
`non-reserved` keywords, not `reserved` ones. Maybe we need to update it.

> Where should we docment the list of reserved/non-reserved key words?
I think the new files you created in this PR is a good place to document it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23258: [SPARK-23375][SQL][FOLLOWUP][TEST] Test Sort metr...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23258#discussion_r240026727
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -182,10 +182,13 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   }
 
   test("Sort metrics") {
-// Assume the execution plan is
-// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1))
-val ds = spark.range(10).sort('id)
-testSparkPlanMetrics(ds.toDF(), 2, Map.empty)
+// Assume the execution plan with node id is
+// Sort(nodeId = 0)
+//   Exchange(nodeId = 1)
+// LocalTableScan(nodeId = 2)
+val df = Seq(1, 3, 2).toDF("id").sort('id)
+testSparkPlanMetrics(df, 2, Map.empty)
--- End diff --

can we check the metrics of `SortExec` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r240026485
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -118,10 +115,12 @@ case class HashClusteredDistribution(
 
 /**
  * Represents data where tuples have been ordered according to the 
`ordering`
- * [[Expression Expressions]].  This is a strictly stronger guarantee than
- * [[ClusteredDistribution]] as an ordering will ensure that tuples that 
share the
- * same value for the ordering expressions are contiguous and will never 
be split across
- * partitions.
+ * [[Expression Expressions]]. Its requirement is defined as the following:
+ *   - Given any 2 adjacent partitions, all the rows of the second 
partition must be larger than or
+ * equal to any row in the first partition, according to the 
`ordering` expressions.
--- End diff --

Note that, only sort requires `OrderedDistribution`, and global sort 
doesn't care if there are equal-rows across partitions.

Here is a definition of the requirement. When designing protocols, it's 
important to make the requirement as weak as possible, and make guarantees as 
strong as possible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23255: [SPARK-26307] [SQL] Fix CTAS when INSERT a partitioned t...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23255
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23255: [SPARK-26307] [SQL] Fix CTAS when INSERT a partit...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23255#discussion_r240026441
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -752,6 +752,17 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
 }
   }
 
+  test("CTAS: INSERT a partitioned table using Hive serde") {
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23262#discussion_r240026394
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -53,7 +53,7 @@ object RDDConversions {
 data.mapPartitions { iterator =>
   val numColumns = outputTypes.length
   val mutableRow = new GenericInternalRow(numColumns)
-  val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
+  val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter).toArray
--- End diff --

shall we use `RowEncoder` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23262#discussion_r240026388
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -33,7 +33,7 @@ object RDDConversions {
 data.mapPartitions { iterator =>
   val numColumns = outputTypes.length
   val mutableRow = new GenericInternalRow(numColumns)
-  val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
+  val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter).toArray
--- End diff --

shall we use `ExpressionEncoder` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r240026330
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] 
with PredicateHelper {
 expressions.flatMap(collectEvaluableUDFs)
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-case plan: LogicalPlan => extract(plan)
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+// SPARK-26293: A subquery will be rewritten into join later, and will 
go through this rule
+// eventually. Here we skip subquery, as Python UDF only needs to be 
extracted once.
+case _: Subquery => plan
--- End diff --

I think you have a point here. If subquery will be converted to join, why 
do we need to optimize subquery ahead?

Anyway, that's something we need to discuss later. cc @dilipbiswal for the 
subquery question.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23253#discussion_r240026245
  
--- Diff: docs/sql-migration-guide-upgrade.md ---
@@ -35,7 +35,9 @@ displayTitle: Spark SQL Upgrading Guide
 
   - Since Spark 3.0, CSV datasource uses java.time API for parsing and 
generating CSV content. New formatting implementation supports date/timestamp 
patterns conformed to ISO 8601. To switch back to the implementation used in 
Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
 
-  - In Spark version 2.4 and earlier, CSV datasource converts a malformed 
CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, 
returned row can contain non-`null` fields if some of CSV column values were 
parsed and converted to desired types successfully.
+  - In Spark version 2.4 and earlier, CSV datasource converts a malformed 
CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, 
the returned row can contain non-`null` fields if some of CSV column values 
were parsed and converted to desired types successfully.
+
+  - In Spark version 2.4 and earlier, JSON datasource and JSON functions 
like `from_json` convert a bad JSON record to a row with all `null`s in the 
PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the 
returned row can contain non-`null` fields if some of JSON column values were 
parsed and converted to desired types successfully.
--- End diff --

does `from_csv` support it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23253#discussion_r240026237
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
 ---
@@ -229,6 +229,11 @@ private[json] trait TestJsonData {
   """{"date": "27/10/2014 18:30"}""" ::
   """{"date": "28/01/2016 20:00"}""" :: Nil))(Encoders.STRING)
 
+  def badRecords: Dataset[String] =
--- End diff --

if it's only used in one test, let's move it to that test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23204: Revert "[SPARK-21052][SQL] Add hash map metrics to join"

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23204
  
+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23265: [2.4][SPARK-26021][SQL][FOLLOWUP] only deal with ...

2018-12-08 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/23265

[2.4][SPARK-26021][SQL][FOLLOWUP] only deal with NaN and -0.0 in 
UnsafeWriter

backport https://github.com/apache/spark/pull/23239 to 2.4

-

## What changes were proposed in this pull request?

A followup of https://github.com/apache/spark/pull/23043

There are 4 places we need to deal with NaN and -0.0:
1. comparison expressions. `-0.0` and `0.0` should be treated as same. 
Different NaNs should be treated as same.
2. Join keys. `-0.0` and `0.0` should be treated as same. Different NaNs 
should be treated as same.
3. grouping keys. `-0.0` and `0.0` should be assigned to the same group. 
Different NaNs should be assigned to the same group.
4. window partition keys. `-0.0` and `0.0` should be treated as same. 
Different NaNs should be treated as same.

The case 1 is OK. Our comparison already handles NaN and -0.0, and for 
struct/array/map, we will recursively compare the fields/elements.

Case 2, 3 and 4 are problematic, as they compare `UnsafeRow` binary 
directly, and different NaNs have different binary representation, and the same 
thing happens for -0.0 and 0.0.

To fix it, a simple solution is: normalize float/double when building 
unsafe data (`UnsafeRow`, `UnsafeArrayData`, `UnsafeMapData`). Then we don't 
need to worry about it anymore.

Following this direction, this PR moves the handling of NaN and -0.0 from 
`Platform` to `UnsafeWriter`, so that places like `UnsafeRow.setFloat` will not 
handle them, which reduces the perf overhead. It's also easier to add comments 
explaining why we do it in `UnsafeWriter`.

## How was this patch tested?

existing tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark minor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23265.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23265


commit 6a837c019eaf7bc9907715a54778bfbb339f3342
Author: Wenchen Fan 
Date:   2018-12-08T19:18:09Z

[SPARK-26021][SQL][FOLLOWUP] only deal with NaN and -0.0 in UnsafeWriter

A followup of https://github.com/apache/spark/pull/23043

There are 4 places we need to deal with NaN and -0.0:
1. comparison expressions. `-0.0` and `0.0` should be treated as same. 
Different NaNs should be treated as same.
2. Join keys. `-0.0` and `0.0` should be treated as same. Different NaNs 
should be treated as same.
3. grouping keys. `-0.0` and `0.0` should be assigned to the same group. 
Different NaNs should be assigned to the same group.
4. window partition keys. `-0.0` and `0.0` should be treated as same. 
Different NaNs should be treated as same.

The case 1 is OK. Our comparison already handles NaN and -0.0, and for 
struct/array/map, we will recursively compare the fields/elements.

Case 2, 3 and 4 are problematic, as they compare `UnsafeRow` binary 
directly, and different NaNs have different binary representation, and the same 
thing happens for -0.0 and 0.0.

To fix it, a simple solution is: normalize float/double when building 
unsafe data (`UnsafeRow`, `UnsafeArrayData`, `UnsafeMapData`). Then we don't 
need to worry about it anymore.

Following this direction, this PR moves the handling of NaN and -0.0 from 
`Platform` to `UnsafeWriter`, so that places like `UnsafeRow.setFloat` will not 
handle them, which reduces the perf overhead. It's also easier to add comments 
explaining why we do it in `UnsafeWriter`.

existing tests

Closes #23239 from cloud-fan/minor.

Authored-by: Wenchen Fan 
Signed-off-by: Dongjoon Hyun 




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23265: [2.4][SPARK-26021][SQL][FOLLOWUP] only deal with NaN and...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23265
  
cc @dongjoon-hyun 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23201: [SPARK-26246][SQL] Infer date and timestamp types...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23201#discussion_r240022552
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -121,7 +122,26 @@ private[sql] class JsonInferSchema(options: 
JSONOptions) extends Serializable {
 DecimalType(bigDecimal.precision, bigDecimal.scale)
 }
 decimalTry.getOrElse(StringType)
-  case VALUE_STRING => StringType
+  case VALUE_STRING =>
+val stringValue = parser.getText
--- End diff --

If we switch the order here, we don't need the length check 
[here](https://github.com/apache/spark/pull/23201/files#diff-e925de14239f40430d05f9ffd0360f10R130),
 right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23207
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23204: Revert "[SPARK-21052][SQL] Add hash map metrics to join"

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23204
  
according to 
https://github.com/apache/spark/pull/23214#issuecomment-443999282 , the hash 
join metrics is wrongly implemented. I think it's fine to revert it and 
re-implement it later.

@JkSelf can you address the comments and only revert the hash join part? 
thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23249: [SPARK-26297][SQL] improve the doc of Distribution/Parti...

2018-12-07 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23249
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Extract Python UDFs at the end...

2018-12-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r239738437
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala ---
@@ -31,7 +31,8 @@ class SparkOptimizer(
 
   override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ 
super.defaultBatches :+
 Batch("Optimize Metadata Only Query", Once, 
OptimizeMetadataOnlyQuery(catalog)) :+
-Batch("Extract Python UDF from Aggregate", Once, 
ExtractPythonUDFFromAggregate) :+
+Batch("Extract Python UDFs", Once,
+  Seq(ExtractPythonUDFFromAggregate, ExtractPythonUDFs): _*) :+
--- End diff --

but we already have `ExtractPythonUDFFromAggregate` here...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239736660
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -78,6 +80,7 @@ object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val NORMALIZE_TIMING_METRIC = "normalizeTiming"
--- End diff --

`private val NS_TIMING_METRIC = "nsTiming"`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239735814
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -78,6 +80,7 @@ object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val NORMALIZE_TIMING_METRIC = "normalizeTiming"
--- End diff --

Actually I think your previous naming is good, sorry for the back and forth.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239735425
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -78,6 +80,7 @@ object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val NORMALIZE_TIMING_METRIC = "normalizeTiming"
--- End diff --

maybe `nsToMsTiming`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239735015
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] class ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the task context, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
+   * per-row operator, here need a careful consideration on performance.
+   */
+  def createMetricsReporter(context: TaskContext): 
ShuffleWriteMetricsReporter = {
--- End diff --

this can be protected?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239734920
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] class ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the task context, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
--- End diff --

`always return a proxy reporter for both local accumulator and original 
reporter updating`

is it stale?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-12-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r239733875
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand(
 Seq.empty[Row]
   }
 
+  // Returns `DataWritingCommand` used to write data when the table exists.
+  def writingCommandForExistingTable(
+catalog: SessionCatalog,
+tableDesc: CatalogTable): DataWritingCommand
+
+  // Returns `DataWritingCommand` used to write data when the table 
doesn't exist.
+  def writingCommandForNewTable(
+catalog: SessionCatalog,
+tableDesc: CatalogTable): DataWritingCommand
+
   override def argString: String = {
 s"[Database:${tableDesc.database}, " +
 s"TableName: ${tableDesc.identifier.table}, " +
 s"InsertIntoHiveTable]"
   }
 }
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableDesc the table description, which may contain serde, 
storage handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class CreateHiveTableAsSelectCommand(
+tableDesc: CatalogTable,
+query: LogicalPlan,
+outputColumnNames: Seq[String],
+mode: SaveMode)
+  extends CreateHiveTableAsSelectBase {
+
+  override def writingCommandForExistingTable(
+  catalog: SessionCatalog,
+  tableDesc: CatalogTable): DataWritingCommand = {
+InsertIntoHiveTable(
+  tableDesc,
+  Map.empty,
--- End diff --

or open a new PR to allow CTAS for partitioned hive table first?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23239: [SPARK-26021][SQL][followup] only deal with NaN and -0.0...

2018-12-07 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23239
  
I checked the original PR that handles NaN: 
https://github.com/apache/spark/commit/c032b0bf92130dc4facb003f0deaeb1228aefded

It didn't add end-to-end tests, so I added 2 new tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23239: [SPARK-26021][SQL][followup] only deal with NaN and -0.0...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23239
  
Yes it is. `UnsafeProjection` always normalize NaN and -0.0, and Spark uses 
`UnsafeProjection` to produce output. So users can't distinguish them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r239690226
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, 
IntegerType}
 
 /**
  * Specifies how tuples that share common expressions will be distributed 
when a query is executed
- * in parallel on many machines.  Distribution can be used to refer to two 
distinct physical
- * properties:
- *  - Inter-node partitioning of data: In this case the distribution 
describes how tuples are
- *partitioned across physical machines in a cluster.  Knowing this 
property allows some
- *operators (e.g., Aggregate) to perform partition local operations 
instead of global ones.
- *  - Intra-partition ordering of data: In this case the distribution 
describes guarantees made
- *about how tuples are distributed within a single partition.
+ * in parallel on many machines.
+ *
+ * Distribution here refers to inter-node partitioning of data:
+ *   The distribution describes how tuples are partitioned across physical 
machines in a cluster.
+ *   Knowing this property allows some operators (e.g., Aggregate) to 
perform partition local
+ *   operations instead of global ones.
  */
--- End diff --

for ordering, I think people can look at `OrderedDistribution`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23201: [SPARK-26246][SQL] Infer date and timestamp types...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23201#discussion_r239687264
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -121,7 +122,26 @@ private[sql] class JsonInferSchema(options: 
JSONOptions) extends Serializable {
 DecimalType(bigDecimal.precision, bigDecimal.scale)
 }
 decimalTry.getOrElse(StringType)
-  case VALUE_STRING => StringType
+  case VALUE_STRING =>
+val stringValue = parser.getText
--- End diff --

or the order doesn't matter?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23201: [SPARK-26246][SQL] Infer date and timestamp types...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23201#discussion_r239687213
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -121,7 +122,26 @@ private[sql] class JsonInferSchema(options: 
JSONOptions) extends Serializable {
 DecimalType(bigDecimal.precision, bigDecimal.scale)
 }
 decimalTry.getOrElse(StringType)
-  case VALUE_STRING => StringType
+  case VALUE_STRING =>
+val stringValue = parser.getText
--- End diff --

I checked `PartitioningUtils.inferPartitionColumnValue`, we try timestamp 
first and then date. Shall we follow it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r239686156
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] 
with PredicateHelper {
 expressions.flatMap(collectEvaluableUDFs)
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-case plan: LogicalPlan => extract(plan)
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+// SPARK-26293: A subquery will be rewritten into join later, and will 
go through this rule
+// eventually. Here we skip subquery, as Python UDF only needs to be 
extracted once.
+case _: Subquery => plan
--- End diff --

I agree it's a bit confusing, but that's how `Subquery` is designed to 
work. See how `RemoveRedundantAliases` catches `Subquery`.

It's sufficient to make `ExtractPythonUDFs` idempotent, skip `Subquery` is 
just for double safe, and may have a little bit perf improvement, since this 
rule will be run less.

In general, I think we should skip `Subquery` here. This is why we create 
`Subquery`: we expect rules that don't want to be executed on subquery to skip 
it. I'll check more rules and see if they need to skip `Subquery` later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23215: [SPARK-26263][SQL] Validate partition values with user p...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23215
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23239: [SPARK-26021][SQL][followup] only deal with NaN and -0.0...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23239
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r239684697
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, 
IntegerType}
 
 /**
  * Specifies how tuples that share common expressions will be distributed 
when a query is executed
- * in parallel on many machines.  Distribution can be used to refer to two 
distinct physical
- * properties:
- *  - Inter-node partitioning of data: In this case the distribution 
describes how tuples are
- *partitioned across physical machines in a cluster.  Knowing this 
property allows some
- *operators (e.g., Aggregate) to perform partition local operations 
instead of global ones.
- *  - Intra-partition ordering of data: In this case the distribution 
describes guarantees made
- *about how tuples are distributed within a single partition.
+ * in parallel on many machines.
+ *
+ * Distribution here refers to inter-node partitioning of data:
+ *   The distribution describes how tuples are partitioned across physical 
machines in a cluster.
+ *   Knowing this property allows some operators (e.g., Aggregate) to 
perform partition local
+ *   operations instead of global ones.
  */
--- End diff --

I intentionally remove everything about intra-partition, as we never 
leverage it and no partitioning provides this property. Did I miss something?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239684490
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java 
---
@@ -25,14 +25,14 @@
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * A mix-in interface for {@link Table}. Data sources can implement this 
interface to
  * provide data writing ability for batch processing.
  *
  * This interface is used to create {@link BatchWriteSupport} instances 
when end users run
  * {@code Dataset.write.format(...).option(...).save()}.
  */
 @Evolving
-public interface BatchWriteSupportProvider extends DataSourceV2 {
+public interface SupportsBatchWrite extends Table {
--- End diff --

I do think read-only or write-only is a necessary feature, according to 
what I've seen in the dev list. Maybe we should move `newScanBuilder` from 
`Table` to the mixin traits.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23208
  
@rdblue I tried to add `WriteBuilder`, but there is a difference between 
read and write:
1. for read, the `ScanBuilder` can collect many information, like column 
pruning, filter pushdown, etc. together, and create a `Scan`
2. for write, it's just different branches, not a combination. e.g. you 
can't do append and replaceWhere at the same time.

Because of this, I feel we don't need `WriterBuilder`, but just different 
mixin traits to create `Write` for different purposes.

Let me know if you have other ideas. Thanks for your review!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239683592
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,52 +17,49 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.util.UUID
-
-import scala.collection.JavaConverters._
+import java.util.{Optional, UUID}
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
 import org.apache.spark.sql.types.StructType
 
 /**
- * A logical plan representing a data source v2 scan.
+ * A logical plan representing a data source v2 table.
  *
- * @param source An instance of a [[DataSourceV2]] implementation.
- * @param options The options for this scan. Used to create fresh 
[[BatchWriteSupport]].
- * @param userSpecifiedSchema The user-specified schema for this scan.
+ * @param table The table that this relation represents.
+ * @param options The options for this table operation. It's used to 
create fresh [[ScanBuilder]]
+ *and [[BatchWriteSupport]].
  */
 case class DataSourceV2Relation(
-// TODO: remove `source` when we finish API refactor for write.
-source: TableProvider,
-table: SupportsBatchRead,
+table: Table,
 output: Seq[AttributeReference],
-options: Map[String, String],
-userSpecifiedSchema: Option[StructType] = None)
+// TODO: use a simple case insensitive map instead.
+options: DataSourceOptions)
--- End diff --

Because this makes the code cleaner, otherwise I need to write more code to 
convert a map to `DataSourceOptions` multiple times inside 
`DataSourceV2Relation`.

I don't have a strong preference here, and just pick the easiest approach 
for me. If you do think using a map here is clearer, I can add these extra code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239682984
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
 assertNotBucketed("save")
 
-val cls = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
-if (classOf[DataSourceV2].isAssignableFrom(cls)) {
-  val source = 
cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
-  source match {
-case provider: BatchWriteSupportProvider =>
-  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
-source,
-df.sparkSession.sessionState.conf)
-  val options = sessionOptions ++ extraOptions
-
+val session = df.sparkSession
+val cls = DataSource.lookupDataSource(source, 
session.sessionState.conf)
+if (classOf[TableProvider].isAssignableFrom(cls)) {
+  val provider = 
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
+  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+provider, session.sessionState.conf)
+  val options = sessionOptions ++ extraOptions
+  val dsOptions = new DataSourceOptions(options.asJava)
+  provider.getTable(dsOptions) match {
+case table: SupportsBatchWrite =>
+  val relation = DataSourceV2Relation.create(table, dsOptions)
+  // TODO: revisit it. We should not create the `AppendData` 
operator for `SaveMode.Append`.
+  // We should create new end-users APIs for the `AppendData` 
operator.
--- End diff --

yea, that's why I only left a comment and just ask for revisiting later. I 
think we can see a clearer picture after we migrating the file source.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239682239
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -25,7 +25,10 @@
  * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
  * have a public, 0-arg constructor.
  * 
- * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write. If you
+ * want to allow end-users to write data to non-existing tables via write 
APIs in `DataFrameWriter`
+ * with `SaveMode`, you must return a {@link Table} instance even if the 
table doesn't exist. The
+ * table schema can be empty in this case.
--- End diff --

I don't want to break existing use cases, file sources can overwrite/append 
to a non-existing location, and we still need to support that with `SaveMode`.

Whatever the new write API will be, I think we still need to support 
`SaveMode` for a while.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23207
  
the code looks much cleaner now!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239677846
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -78,6 +80,7 @@ object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val NS_TIMING_METRIC = "nanosecond"
--- End diff --

Can we change it to `ms`? The core side can still be `ns`, but in SQL side 
we truncate it into `ms`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239677653
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -333,8 +343,19 @@ object ShuffleExchangeExec {
   new ShuffleDependency[Int, InternalRow, InternalRow](
 rddWithPartitionIds,
 new PartitionIdPassthrough(part.numPartitions),
-serializer)
+serializer,
+shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics))
 
 dependency
   }
+
+  /**
+   * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the 
default metrics reporter
+   * with [[SQLShuffleWriteMetricsReporter]] as new reporter for 
[[ShuffleWriteProcessor]].
+   */
+  def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): 
ShuffleWriteProcessor = {
+(reporter: ShuffleWriteMetricsReporter) => {
--- End diff --

does this work with Scala 2.11? maybe we don't need to be that fancy and 
just write
```
new ShuffleWriteProcessor {
  xxx
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239677477
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] trait ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
+   * per-row operator, here need a careful consideration on performance.
+   */
+  def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): 
ShuffleWriteMetricsReporter
--- End diff --

after it, we can just make `ShuffleWriteProcessor` a class


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23207#discussion_r239677325
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, 
TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.MapStatus
+
+/**
+ * The interface for customizing shuffle write process. The driver create 
a ShuffleWriteProcessor
+ * and put it into [[ShuffleDependency]], and executors use it in each 
ShuffleMapTask.
+ */
+private[spark] trait ShuffleWriteProcessor extends Serializable with 
Logging {
+
+  /**
+   * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, 
always return a proxy
+   * reporter for both local accumulator and original reporter updating. 
As the reporter is a
+   * per-row operator, here need a careful consideration on performance.
+   */
+  def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): 
ShuffleWriteMetricsReporter
--- End diff --

how about `def createMetricsReporter(context: TaskContext)`?

Then in core it's implemented as
```
context.taskMetrics().shuffleWriteMetrics
```

and in SQL
```
new SQLShuffle.Reporter(context.taskMetrics().shuffleWriteMetrics)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23244: [SPARK-26289][CORE]cleanup enablePerfMetrics parameter f...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23244
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23244: [SPARK-26289][CORE]cleanup enablePerfMetrics para...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23244#discussion_r239675382
  
--- Diff: 
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---
@@ -209,23 +205,14 @@ public BytesToBytesMap(
   TaskMemoryManager taskMemoryManager,
--- End diff --

OK all tests, then we are fine


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23201: [SPARK-26246][SQL] Infer date and timestamp types...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23201#discussion_r239539848
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -121,7 +122,26 @@ private[sql] class JsonInferSchema(options: 
JSONOptions) extends Serializable {
 DecimalType(bigDecimal.precision, bigDecimal.scale)
 }
 decimalTry.getOrElse(StringType)
-  case VALUE_STRING => StringType
+  case VALUE_STRING =>
+val stringValue = parser.getText
--- End diff --

sure. How many text data sources already support it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23201: [SPARK-26246][SQL] Infer date and timestamp types...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23201#discussion_r239534668
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -121,7 +122,26 @@ private[sql] class JsonInferSchema(options: 
JSONOptions) extends Serializable {
 DecimalType(bigDecimal.precision, bigDecimal.scale)
 }
 decimalTry.getOrElse(StringType)
-  case VALUE_STRING => StringType
+  case VALUE_STRING =>
+val stringValue = parser.getText
--- End diff --

shall we abstract out this logic for all the text sources?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r239508488
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -118,10 +116,13 @@ case class HashClusteredDistribution(
 
 /**
  * Represents data where tuples have been ordered according to the 
`ordering`
- * [[Expression Expressions]].  This is a strictly stronger guarantee than
- * [[ClusteredDistribution]] as an ordering will ensure that tuples that 
share the
- * same value for the ordering expressions are contiguous and will never 
be split across
- * partitions.
+ * [[Expression Expressions]].
+ *
+ * Tuples that share the same values for the ordering expressions must be 
contiguous within a
+ * partition. They can also across partitions, but these partitions must 
be contiguous. For example,
+ * if value `v` is the biggest values in partition 3, it can also be in 
partition 4 as the smallest
+ * value. If all the values in partition 4 are `v`, it can also be in 
partition 5 as the smallest
+ * value.
  */
 case class OrderedDistribution(ordering: Seq[SortOrder]) extends 
Distribution {
--- End diff --

This is only used by sort, and sort doesn't require rows of same value to 
be colocated in the same partition.

Actually we already use this knowledge to optimize 
`RangePartitioning.satisfy`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23249#discussion_r239508437
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -118,10 +116,13 @@ case class HashClusteredDistribution(
 
 /**
  * Represents data where tuples have been ordered according to the 
`ordering`
- * [[Expression Expressions]].  This is a strictly stronger guarantee than
- * [[ClusteredDistribution]] as an ordering will ensure that tuples that 
share the
- * same value for the ordering expressions are contiguous and will never 
be split across
- * partitions.
+ * [[Expression Expressions]].
+ *
+ * Tuples that share the same values for the ordering expressions must be 
contiguous within a
+ * partition. They can also across partitions, but these partitions must 
be contiguous. For example,
+ * if value `v` is the biggest values in partition 3, it can also be in 
partition 4 as the smallest
+ * value. If all the values in partition 4 are `v`, it can also be in 
partition 5 as the smallest
+ * value.
  */
 case class OrderedDistribution(ordering: Seq[SortOrder]) extends 
Distribution {
--- End diff --

This is only used by sort, and sort doesn't require rows of same value to 
be colocated in the same partition.

Actually we already use this knowledge to optimize 
`RangePartitioning.satisfy`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23239: [SPARK-26021][SQL][followup] only deal with NaN a...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23239#discussion_r239507673
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java
 ---
@@ -198,11 +198,45 @@ protected final void writeLong(long offset, long 
value) {
 Platform.putLong(getBuffer(), offset, value);
   }
 
+  // We need to take care of NaN and -0.0 in several places:
+  //   1. When compare values, different NaNs should be treated as same, 
`-0.0` and `0.0` should be
+  //  treated as same.
+  //   2. In range partitioner, different NaNs should belong to the same 
partition, -0.0 and 0.0
--- End diff --

It turns out this is not a problem. The doc of `RangePartitioning` is 
misleading. I'm updating the doc at https://github.com/apache/spark/pull/23249


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23249: [SPARK-26297][SQL] improve the doc of Distribution/Parti...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23249
  
cc @maryannxue @hvanhovell @gatorsmile @viirya 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...

2018-12-06 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/23249

[SPARK-26297][SQL] improve the doc of Distribution/Partitioning

## What changes were proposed in this pull request?

Some documents of `Distribution/Partitioning` are stale and misleading, 
this PR fixes them:
1. `ClusteredDistribution` doesn't have intra-partition requirement
2. `OrderedDistribution` does not require tuples that share the same value 
being colocated in the same partition.
3. `RangePartitioning` can provide a weaker guarantee for a prefix of its 
`ordering` expressions.

## How was this patch tested?

comment-only PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23249.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23249


commit 24ea28abd5a385351703335df33b26838d203fe3
Author: Wenchen Fan 
Date:   2018-12-06T15:47:23Z

improve the doc of Distribution/Partitioning




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23248: [SPARK-26293][SQL] Cast exception when having python udf...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23248
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239469368
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java 
---
@@ -25,14 +25,14 @@
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * A mix-in interface for {@link Table}. Data sources can implement this 
interface to
  * provide data writing ability for batch processing.
  *
  * This interface is used to create {@link BatchWriteSupport} instances 
when end users run
  * {@code Dataset.write.format(...).option(...).save()}.
  */
 @Evolving
-public interface BatchWriteSupportProvider extends DataSourceV2 {
+public interface SupportsBatchWrite extends Table {
--- End diff --

That's why I left 
https://github.com/apache/spark/pull/23208#discussion_r238524973 .

namings are welcome!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23215: [SPARK-26263][SQL] Validate partition values with...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23215#discussion_r239453312
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1396,6 +1396,16 @@ object SQLConf {
 .booleanConf
 .createWithDefault(false)
 
+  val VALIDATE_PARTITION_COLUMNS =
+buildConf("spark.sql.sources.validatePartitionColumns")
+  .internal()
+  .doc("When this option is set to true, partition column values will 
be validated with " +
+"provided schema. If the validation fails, a runtime exception is 
thrown." +
+"When this option is set to false, the partition column value will 
be converted to null " +
+"if it can not be converted to corresponding provided schema.")
--- End diff --

`... can not be casted to ...`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23215: [SPARK-26263][SQL] Validate partition values with...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23215#discussion_r239453026
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 ---
@@ -95,6 +95,31 @@ class FileIndexSuite extends SharedSQLContext {
 }
   }
 
+  test("SPARK-26263: Throw exception when partition value can't be 
converted to specific type") {
--- End diff --

`can't be casted to user-specified type`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23213
  
these 3 combinations LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23244: [SPARK-26289][CORE]cleanup enablePerfMetrics para...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23244#discussion_r239451274
  
--- Diff: 
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---
@@ -209,23 +205,14 @@ public BytesToBytesMap(
   TaskMemoryManager taskMemoryManager,
--- End diff --

If this constructor is called, the `enablePerfMetrics` will be false. Where 
do we use this constructor?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23248: [SPARK-26293][SQL] Cast exception when having python udf...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/23248
  
cc @icexelloss  @HyukjinKwon @ueshin @viirya @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r239430315
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -60,8 +60,12 @@ private class BatchIterator[T](iter: Iterator[T], 
batchSize: Int)
 /**
  * A logical plan that evaluates a [[PythonUDF]].
  */
-case class ArrowEvalPython(udfs: Seq[PythonUDF], output: Seq[Attribute], 
child: LogicalPlan)
-  extends UnaryNode
+case class ArrowEvalPython(
+udfs: Seq[PythonUDF],
+output: Seq[Attribute],
+child: LogicalPlan) extends UnaryNode {
+  override def producedAttributes: AttributeSet = 
AttributeSet(output.drop(child.output.length))
--- End diff --

a different but related fix, to make the `missingAttributes` calculated 
correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r239430084
  
--- Diff: python/pyspark/sql/tests/test_udf.py ---
@@ -23,7 +23,7 @@
 
 from pyspark import SparkContext
 from pyspark.sql import SparkSession, Column, Row
-from pyspark.sql.functions import UserDefinedFunction
+from pyspark.sql.functions import UserDefinedFunction, udf
--- End diff --

add the import here, as a lof of tests use it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-06 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/23248

[SPARK-26293][SQL] Cast exception when having python udf in subquery

## What changes were proposed in this pull request?

This is a regression introduced by 
https://github.com/apache/spark/pull/22104 at Spark 2.4.0.

When we have Python UDF in subquery, we will hit an exception
```
Caused by: java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.AttributeReference cannot be cast to 
org.apache.spark.sql.catalyst.expressions.PythonUDF
at scala.collection.immutable.Stream.map(Stream.scala:414)
at 
org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:98)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:815)
...
```

https://github.com/apache/spark/pull/22104 turned `ExtractPythonUDFs` from 
a physical rule to optimizer rule. However, there is a difference between a 
physical rule and optimizer rule. A physical rule always runs once, an 
optimizer rule may be applied twice on a query tree even the rule is located in 
a batch that only runs once.

For a subquery, the `OptimizeSubqueries` rule will execute the entire 
optimizer on the query plan inside subquery. Later on subquery will be turned 
to joins, and the optimizer rules will be applied to it again.

Unfortunately, the `ExtractPythonUDFs` rule is not idempotent. When it's 
applied twice on a query plan inside subquery, it will produce a malformed 
plan. It extracts Python UDF from Python exec plans.

This PR proposes 2 changes to be double safe:
1. `ExtractPythonUDFs` should skip python exec plans, to make the rule 
idempotent
2. `ExtractPythonUDFs` should skip subquery

## How was this patch tested?

a new test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark python

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23248.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23248


commit 9477fb09b850b981862cb72b0ebdebc5b404a082
Author: Wenchen Fan 
Date:   2018-12-06T11:16:04Z

python udf in subquery




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >