[GitHub] spark pull request #23275: [SPARK-26323][SQL] Scala UDF should still check i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23275#discussion_r240234583 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala --- @@ -88,68 +88,49 @@ sealed trait UserDefinedFunction { private[sql] case class SparkUserDefinedFunction( f: AnyRef, dataType: DataType, -inputTypes: Option[Seq[DataType]], -nullableTypes: Option[Seq[Boolean]], +inputSchemas: Seq[Option[ScalaReflection.Schema]], name: Option[String] = None, nullable: Boolean = true, deterministic: Boolean = true) extends UserDefinedFunction { @scala.annotation.varargs - override def apply(exprs: Column*): Column = { -// TODO: make sure this class is only instantiated through `SparkUserDefinedFunction.create()` -// and `nullableTypes` is always set. -if (inputTypes.isDefined) { - assert(inputTypes.get.length == nullableTypes.get.length) -} - -val inputsNullSafe = nullableTypes.getOrElse { - ScalaReflection.getParameterTypeNullability(f) --- End diff -- Not worth to keep it anymore, as Scala 2.12 is the default now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23228: [MINOR][DOC] Update the condition description of seriali...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23228 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23275: [SPARK-26323][SQL] Scala UDF should still check i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23275#discussion_r240231883 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -4255,11 +4255,11 @@ object functions { * * @group udf_funcs * @since 2.0.0 + * + * @deprecated("please use the typed `udf` methods", "3.0.0") --- End diff -- with Scala 2.12, type and nullability info need to be retrieved during compile time, this method is not very useful and we should deprecate it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23275: [SPARK-26323][SQL] Scala UDF should still check input ty...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23275 cc @maryannxue @gatorsmile @srowen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23275: [SPARK-26323][SQL] Scala UDF should still check i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23275#discussion_r240230970 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala --- @@ -47,25 +47,13 @@ case class ScalaUDF( function: AnyRef, dataType: DataType, children: Seq[Expression], -inputsNullSafe: Seq[Boolean], -inputTypes: Seq[DataType] = Nil, +@transient inputsNullSafe: Seq[Boolean], +@transient inputTypes: Seq[AbstractDataType] = Nil, udfName: Option[String] = None, nullable: Boolean = true, udfDeterministic: Boolean = true) extends Expression with ImplicitCastInputTypes with NonSQLExpression with UserDefinedExpression { - // The constructor for SPARK 2.1 and 2.2 --- End diff -- not useful anymore in Spark 3.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23275: [SPARK-26323][SQL] Scala UDF should still check i...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23275 [SPARK-26323][SQL] Scala UDF should still check input types even if some inputs are of type Any ## What changes were proposed in this pull request? For Scala UDF, when checking input nullability, we will skip inputs with type `Any`, and only check the inputs that provide nullability info. We should do the same for checking input types. ## How was this patch tested? new tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark udf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23275.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23275 commit 8582607195f12a4c133fb28b59e8a7fce7a97fbb Author: Wenchen Fan Date: 2018-12-10T13:00:17Z Scala UDF should still check input types even if some inputs are of type Any --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23272#discussion_r240189245 --- Diff: core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java --- @@ -667,4 +669,54 @@ public void testPeakMemoryUsed() { } } + @Test + public void avoidDeadlock() throws InterruptedException { +memoryManager.limit(PAGE_SIZE_BYTES); +MemoryMode mode = useOffHeapMemoryAllocator() ? MemoryMode.OFF_HEAP: MemoryMode.ON_HEAP; +TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager, mode); +BytesToBytesMap map = + new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024); + +Runnable memoryConsumer = new Runnable() { + @Override + public void run() { +int i = 0; +long used = 0; +while (i < 10) { + c1.use(1000); + used += 1000; + i++; +} +c1.free(used); + } +}; + +Thread thread = new Thread(memoryConsumer); + +try { + int i; + for (i = 0; i < 1024; i++) { +final long[] arr = new long[]{i}; +final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8); +loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8); + } + + // Starts to require memory at another memory consumer. + thread.start(); + + BytesToBytesMap.MapIterator iter = map.destructiveIterator(); + for (i = 0; i < 1024; i++) { +iter.next(); + } + assertFalse(iter.hasNext()); +} finally { + map.free(); + thread.join(); --- End diff -- Is this line where the test hangs without the fix? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23251: [SPARK-26300][SS] Remove a redundant `checkForStreaming`...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23251 cc @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23262: [SPARK-26312][SQL]Converting converters in RDDConversion...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23262 LGTM, can you update the PR title and description? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23262#discussion_r240180713 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala --- @@ -416,7 +416,12 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with output: Seq[Attribute], rdd: RDD[Row]): RDD[InternalRow] = { if (relation.relation.needConversion) { - execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType)) + val converters = RowEncoder(StructType.fromAttributes(output)) + rdd.mapPartitions { iterator => +iterator.map { r => --- End diff -- nit: `iterator.map(converters.toRow)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesMap.MapI...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23272 have you seen any bug report caused by this dead lock? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23272: [SPARK-26265][Core] Fix deadlock in BytesToBytesM...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23272#discussion_r240178993 --- Diff: core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java --- @@ -38,12 +38,14 @@ public long spill(long size, MemoryConsumer trigger) throws IOException { return used; } - void use(long size) { + @VisibleForTesting --- End diff -- This is a test class, we can just make it public. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23204: Revert "[SPARK-21052][SQL] Add hash map metrics t...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23204#discussion_r240104812 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala --- @@ -213,10 +213,6 @@ trait HashJoin { s"BroadcastHashJoin should not take $x as the JoinType") } -// At the end of the task, we update the avg hash probe. -TaskContext.get().addTaskCompletionListener[Unit](_ => --- End diff -- in this file, the `join` method takes `avgHashProbe: SQLMetric`, we should remove it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23255: [SPARK-26307] [SQL] Fix CTAS when INSERT a partitioned t...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23255 thanks, merging to master/2.4/2.3! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23211: [SPARK-19712][SQL] Move PullupCorrelatedPredicates and R...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23211 to make the PR smaller, can we add an individual rule `PushdownLeftSemiOrAntiJoin` first? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23211: [SPARK-19712][SQL] Move PullupCorrelatedPredicate...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23211#discussion_r240097479 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -984,6 +1002,28 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) +// Similar to the above Filter over Project +// LeftSemi/LeftAnti over Project +case join @ Join(p @ Project(pList, grandChild), rightOp, LeftSemiOrAnti(joinType), joinCond) --- End diff -- Shall we create a new rule `PushdownLeftSemaOrAntiJoin`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23211: [SPARK-19712][SQL] Move PullupCorrelatedPredicate...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23211#discussion_r240097255 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -649,13 +664,16 @@ object CollapseProject extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case p1 @ Project(_, p2: Project) => - if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList)) { + if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) || +ScalarSubquery.hasScalarSubquery(p1.projectList) || +ScalarSubquery.hasScalarSubquery(p2.projectList)) { --- End diff -- why do we allow it before? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23204: Revert "[SPARK-21052][SQL] Add hash map metrics to join"
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23204 can we follow https://github.com/apache/spark/pull/23204#issuecomment-445510026 and create a new ticket? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23211: [SPARK-19712][SQL] Move PullupCorrelatedPredicate...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23211#discussion_r240092936 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala --- @@ -267,6 +267,17 @@ object ScalarSubquery { case _ => false }.isDefined } + + def hasScalarSubquery(e: Expression): Boolean = { +e.find { + case s: ScalarSubquery => true + case _ => false +}.isDefined + } + + def hasScalarSubquery(e: Seq[Expression]): Boolean = { +e.find(hasScalarSubquery(_)).isDefined --- End diff -- `e.exists(hasScalarSubquery)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23248: [SPARK-26293][SQL] Cast exception when having python udf...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23248 If it's fine for 2.4, I think it's also fine for master as a temporary fix? We can create another ticket to clean up the subquery optimization hack. IIUC https://github.com/apache/spark/pull/23211 may help with it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23258: [SPARK-23375][SQL][FOLLOWUP][TEST] Test Sort metr...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23258#discussion_r240090371 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -182,10 +182,13 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared } test("Sort metrics") { -// Assume the execution plan is -// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1)) -val ds = spark.range(10).sort('id) -testSparkPlanMetrics(ds.toDF(), 2, Map.empty) +// Assume the execution plan with node id is +// Sort(nodeId = 0) +// Exchange(nodeId = 1) +// LocalTableScan(nodeId = 2) +val df = Seq(1, 3, 2).toDF("id").sort('id) +testSparkPlanMetrics(df, 2, Map.empty) --- End diff -- can we just check something like `sortTime > 0`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23201: [SPARK-26246][SQL] Infer date and timestamp types...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23201#discussion_r240090192 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -121,7 +122,26 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { DecimalType(bigDecimal.precision, bigDecimal.scale) } decimalTry.getOrElse(StringType) - case VALUE_STRING => StringType + case VALUE_STRING => +val stringValue = parser.getText --- End diff -- the partition feature is shared between all the file-based sources, I think it's an overkill to make it differ with different data sources. The simplest solution to me is asking all text sources to follow the behavior of partition value type inference. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23265: [2.4][SPARK-26021][SQL][FOLLOWUP] only deal with ...
Github user cloud-fan closed the pull request at: https://github.com/apache/spark/pull/23265 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23228: [MINOR][DOC] Update the condition description of seriali...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23228 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23204: Revert "[SPARK-21052][SQL] Add hash map metrics to join"
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23204 If we can quickly finish #23214 (within several days), let's go for it. But if we can't, I'd suggest we do the partial revert first to fix the perf regression, and add back the metrics later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23228: [MINOR][DOC]The condition description of serialized shuf...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23228 LGTM, cc @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23228: [MINOR][DOC]The condition description of serializ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23228#discussion_r240036698 --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala --- @@ -33,10 +33,10 @@ import org.apache.spark.shuffle._ * Sort-based shuffle has two different write paths for producing its map output files: * * - Serialized sorting: used when all three of the following conditions hold: - *1. The shuffle dependency specifies no aggregation or output ordering. + *1. The shuffle dependency specifies no map-side combine. --- End diff -- looks right to me, according to https://github.com/apache/spark/blob/d5dadbf30d5429c36ec3d5c2845a71c2717fd6f3/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala#L195 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23253: [SPARK-26303][SQL] Return partial results for bad JSON r...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23253 LGTM except a code style comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23253#discussion_r240036498 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -347,17 +347,28 @@ class JacksonParser( schema: StructType, fieldConverters: Array[ValueConverter]): InternalRow = { val row = new GenericInternalRow(schema.length) +var badRecordException: Option[Throwable] = None + while (nextUntil(parser, JsonToken.END_OBJECT)) { schema.getFieldIndex(parser.getCurrentName) match { case Some(index) => - row.update(index, fieldConverters(index).apply(parser)) - + try { +row.update(index, fieldConverters(index).apply(parser)) + } catch { +case NonFatal(e) => + badRecordException = badRecordException.orElse(Some(e)) + parser.skipChildren() + } case None => parser.skipChildren() } } -row +if (badRecordException.isEmpty) { + row +} else { + throw BadRecordException(() => UTF8String.EMPTY_UTF8, () => Some(row), badRecordException.get) --- End diff -- or we can create a new exception type and use it here, which only carries the row and the exception --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23253#discussion_r240036489 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -347,17 +347,28 @@ class JacksonParser( schema: StructType, fieldConverters: Array[ValueConverter]): InternalRow = { val row = new GenericInternalRow(schema.length) +var badRecordException: Option[Throwable] = None + while (nextUntil(parser, JsonToken.END_OBJECT)) { schema.getFieldIndex(parser.getCurrentName) match { case Some(index) => - row.update(index, fieldConverters(index).apply(parser)) - + try { +row.update(index, fieldConverters(index).apply(parser)) + } catch { +case NonFatal(e) => + badRecordException = badRecordException.orElse(Some(e)) + parser.skipChildren() + } case None => parser.skipChildren() } } -row +if (badRecordException.isEmpty) { + row +} else { + throw BadRecordException(() => UTF8String.EMPTY_UTF8, () => Some(row), badRecordException.get) --- End diff -- add a comment to say that, we don't know the original record here, and it will be filled later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23201: [SPARK-26246][SQL] Infer date and timestamp types...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23201#discussion_r240036225 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -121,7 +122,26 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { DecimalType(bigDecimal.precision, bigDecimal.scale) } decimalTry.getOrElse(StringType) - case VALUE_STRING => StringType + case VALUE_STRING => +val stringValue = parser.getText --- End diff -- do you mean partition value type inference will have a different result than json value type inference? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23208 Let's move the high level discussion to the doc: https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23266: [SPARK-26313][SQL] move read related methods from...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23266#discussion_r240029373 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java --- @@ -20,14 +20,27 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.sources.v2.reader.Scan; import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.types.StructType; /** - * An empty mix-in interface for {@link Table}, to indicate this table supports batch scan. - * - * If a {@link Table} implements this interface, its {@link Table#newScanBuilder(DataSourceOptions)} - * must return a {@link ScanBuilder} that builds {@link Scan} with {@link Scan#toBatch()} - * implemented. - * + * A mix-in interface for {@link Table} to provide data reading ability of batch processing. */ @Evolving -public interface SupportsBatchRead extends Table { } +public interface SupportsBatchRead extends Table { + + /** + * Returns the schema of this table. + */ + StructType schema(); --- End diff -- I'm not sure about this. Maybe it's ok to leave `schema` in `Table`, and asks write-only table to report schema as empty. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r240028574 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,52 +17,49 @@ package org.apache.spark.sql.execution.datasources.v2 -import java.util.UUID - -import scala.collection.JavaConverters._ +import java.util.{Optional, UUID} import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport import org.apache.spark.sql.types.StructType /** - * A logical plan representing a data source v2 scan. + * A logical plan representing a data source v2 table. * - * @param source An instance of a [[DataSourceV2]] implementation. - * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]]. - * @param userSpecifiedSchema The user-specified schema for this scan. + * @param table The table that this relation represents. + * @param options The options for this table operation. It's used to create fresh [[ScanBuilder]] + *and [[BatchWriteSupport]]. */ case class DataSourceV2Relation( -// TODO: remove `source` when we finish API refactor for write. -source: TableProvider, -table: SupportsBatchRead, +table: Table, output: Seq[AttributeReference], -options: Map[String, String], -userSpecifiedSchema: Option[StructType] = None) +// TODO: use a simple case insensitive map instead. +options: DataSourceOptions) --- End diff -- It was done it multiple places before: https://github.com/apache/spark/pull/23208/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23L62 https://github.com/apache/spark/pull/23208/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23L153 https://github.com/apache/spark/pull/23208/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23L171 If you prefer it strongly, I can follow it and update the code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r240028515 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java --- @@ -25,7 +25,10 @@ * The base interface for v2 data sources which don't have a real catalog. Implementations must * have a public, 0-arg constructor. * - * The major responsibility of this interface is to return a {@link Table} for read/write. + * The major responsibility of this interface is to return a {@link Table} for read/write. If you + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter` + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The + * table schema can be empty in this case. --- End diff -- I'm not convinced it's safe to remove `SaveMode` right away, when there is only an `Append` operator implemented currently. If we do it, it means `DataFrameWriter.save` need to throw an exception for a lot of cases, except when the `mode` is append. I don't think this is acceptable right now. Can we discuss the removal of `SaveMode` at least after all the necessary new write operators are implemented? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23266: [SPARK-26313][SQL] move read related methods from...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23266 [SPARK-26313][SQL] move read related methods from Table to read related mix-in traits ## What changes were proposed in this pull request? As discussed in https://github.com/apache/spark/pull/23208/files#r239684490 , we should put read related methods in read related mix-in traits like `SupportsBatchRead`, to support write-only table. In the `Append` operator, we should skip schema validation if the table is write-only. This will be done when we finish the write API refactor in https://github.com/apache/spark/pull/23208/ ## How was this patch tested? existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark ds-read Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23266.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23266 commit da520cc6e7daa36c7d7fabdb0a08d4b4341250b9 Author: Wenchen Fan Date: 2018-12-09T08:32:51Z move read related methods from Table to read related mix-in traits --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23266: [SPARK-26313][SQL] move read related methods from Table ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23266 cc @rdblue @HyukjinKwon @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23265: [2.4][SPARK-26021][SQL][FOLLOWUP] only deal with NaN and...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23265 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23259: [SPARK-26215][SQL][WIP] Define reserved/non-reserved key...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23259 thanks @maropu for starting it! > Which SQL standard does Spark SQL follow (e.g., 2011 or 2016)? I think SQL 2011 is good, but if we can't find a public version, maybe it's also OK to follow [postgres](https://github.com/postgres/postgres/blob/ee2b37ae044f34851baba69e9ba737077326414e/src/backend/parser/gram.y#L15366) > Where should we hanlde reserved key words? I think it should be `SqlBase.g4`, but a problem is, the g4 files defines `non-reserved` keywords, not `reserved` ones. Maybe we need to update it. > Where should we docment the list of reserved/non-reserved key words? I think the new files you created in this PR is a good place to document it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23258: [SPARK-23375][SQL][FOLLOWUP][TEST] Test Sort metr...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23258#discussion_r240026727 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -182,10 +182,13 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared } test("Sort metrics") { -// Assume the execution plan is -// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1)) -val ds = spark.range(10).sort('id) -testSparkPlanMetrics(ds.toDF(), 2, Map.empty) +// Assume the execution plan with node id is +// Sort(nodeId = 0) +// Exchange(nodeId = 1) +// LocalTableScan(nodeId = 2) +val df = Seq(1, 3, 2).toDF("id").sort('id) +testSparkPlanMetrics(df, 2, Map.empty) --- End diff -- can we check the metrics of `SortExec` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r240026485 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -118,10 +115,12 @@ case class HashClusteredDistribution( /** * Represents data where tuples have been ordered according to the `ordering` - * [[Expression Expressions]]. This is a strictly stronger guarantee than - * [[ClusteredDistribution]] as an ordering will ensure that tuples that share the - * same value for the ordering expressions are contiguous and will never be split across - * partitions. + * [[Expression Expressions]]. Its requirement is defined as the following: + * - Given any 2 adjacent partitions, all the rows of the second partition must be larger than or + * equal to any row in the first partition, according to the `ordering` expressions. --- End diff -- Note that, only sort requires `OrderedDistribution`, and global sort doesn't care if there are equal-rows across partitions. Here is a definition of the requirement. When designing protocols, it's important to make the requirement as weak as possible, and make guarantees as strong as possible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23255: [SPARK-26307] [SQL] Fix CTAS when INSERT a partitioned t...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23255 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23255: [SPARK-26307] [SQL] Fix CTAS when INSERT a partit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23255#discussion_r240026441 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -752,6 +752,17 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter } } + test("CTAS: INSERT a partitioned table using Hive serde") { --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23262#discussion_r240026394 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -53,7 +53,7 @@ object RDDConversions { data.mapPartitions { iterator => val numColumns = outputTypes.length val mutableRow = new GenericInternalRow(numColumns) - val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) + val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter).toArray --- End diff -- shall we use `RowEncoder` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23262: [SPARK-26312][SQL]Converting converters in RDDCon...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23262#discussion_r240026388 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -33,7 +33,7 @@ object RDDConversions { data.mapPartitions { iterator => val numColumns = outputTypes.length val mutableRow = new GenericInternalRow(numColumns) - val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter) + val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter).toArray --- End diff -- shall we use `ExpressionEncoder` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r240026330 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { expressions.flatMap(collectEvaluableUDFs) } - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case plan: LogicalPlan => extract(plan) + def apply(plan: LogicalPlan): LogicalPlan = plan match { +// SPARK-26293: A subquery will be rewritten into join later, and will go through this rule +// eventually. Here we skip subquery, as Python UDF only needs to be extracted once. +case _: Subquery => plan --- End diff -- I think you have a point here. If subquery will be converted to join, why do we need to optimize subquery ahead? Anyway, that's something we need to discuss later. cc @dilipbiswal for the subquery question. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23253#discussion_r240026245 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -35,7 +35,9 @@ displayTitle: Spark SQL Upgrading Guide - Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. - - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. + + - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. --- End diff -- does `from_csv` support it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23253: [SPARK-26303][SQL] Return partial results for bad...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23253#discussion_r240026237 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala --- @@ -229,6 +229,11 @@ private[json] trait TestJsonData { """{"date": "27/10/2014 18:30"}""" :: """{"date": "28/01/2016 20:00"}""" :: Nil))(Encoders.STRING) + def badRecords: Dataset[String] = --- End diff -- if it's only used in one test, let's move it to that test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23204: Revert "[SPARK-21052][SQL] Add hash map metrics to join"
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23204 +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23265: [2.4][SPARK-26021][SQL][FOLLOWUP] only deal with ...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23265 [2.4][SPARK-26021][SQL][FOLLOWUP] only deal with NaN and -0.0 in UnsafeWriter backport https://github.com/apache/spark/pull/23239 to 2.4 - ## What changes were proposed in this pull request? A followup of https://github.com/apache/spark/pull/23043 There are 4 places we need to deal with NaN and -0.0: 1. comparison expressions. `-0.0` and `0.0` should be treated as same. Different NaNs should be treated as same. 2. Join keys. `-0.0` and `0.0` should be treated as same. Different NaNs should be treated as same. 3. grouping keys. `-0.0` and `0.0` should be assigned to the same group. Different NaNs should be assigned to the same group. 4. window partition keys. `-0.0` and `0.0` should be treated as same. Different NaNs should be treated as same. The case 1 is OK. Our comparison already handles NaN and -0.0, and for struct/array/map, we will recursively compare the fields/elements. Case 2, 3 and 4 are problematic, as they compare `UnsafeRow` binary directly, and different NaNs have different binary representation, and the same thing happens for -0.0 and 0.0. To fix it, a simple solution is: normalize float/double when building unsafe data (`UnsafeRow`, `UnsafeArrayData`, `UnsafeMapData`). Then we don't need to worry about it anymore. Following this direction, this PR moves the handling of NaN and -0.0 from `Platform` to `UnsafeWriter`, so that places like `UnsafeRow.setFloat` will not handle them, which reduces the perf overhead. It's also easier to add comments explaining why we do it in `UnsafeWriter`. ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark minor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23265.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23265 commit 6a837c019eaf7bc9907715a54778bfbb339f3342 Author: Wenchen Fan Date: 2018-12-08T19:18:09Z [SPARK-26021][SQL][FOLLOWUP] only deal with NaN and -0.0 in UnsafeWriter A followup of https://github.com/apache/spark/pull/23043 There are 4 places we need to deal with NaN and -0.0: 1. comparison expressions. `-0.0` and `0.0` should be treated as same. Different NaNs should be treated as same. 2. Join keys. `-0.0` and `0.0` should be treated as same. Different NaNs should be treated as same. 3. grouping keys. `-0.0` and `0.0` should be assigned to the same group. Different NaNs should be assigned to the same group. 4. window partition keys. `-0.0` and `0.0` should be treated as same. Different NaNs should be treated as same. The case 1 is OK. Our comparison already handles NaN and -0.0, and for struct/array/map, we will recursively compare the fields/elements. Case 2, 3 and 4 are problematic, as they compare `UnsafeRow` binary directly, and different NaNs have different binary representation, and the same thing happens for -0.0 and 0.0. To fix it, a simple solution is: normalize float/double when building unsafe data (`UnsafeRow`, `UnsafeArrayData`, `UnsafeMapData`). Then we don't need to worry about it anymore. Following this direction, this PR moves the handling of NaN and -0.0 from `Platform` to `UnsafeWriter`, so that places like `UnsafeRow.setFloat` will not handle them, which reduces the perf overhead. It's also easier to add comments explaining why we do it in `UnsafeWriter`. existing tests Closes #23239 from cloud-fan/minor. Authored-by: Wenchen Fan Signed-off-by: Dongjoon Hyun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23265: [2.4][SPARK-26021][SQL][FOLLOWUP] only deal with NaN and...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23265 cc @dongjoon-hyun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23201: [SPARK-26246][SQL] Infer date and timestamp types...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23201#discussion_r240022552 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -121,7 +122,26 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { DecimalType(bigDecimal.precision, bigDecimal.scale) } decimalTry.getOrElse(StringType) - case VALUE_STRING => StringType + case VALUE_STRING => +val stringValue = parser.getText --- End diff -- If we switch the order here, we don't need the length check [here](https://github.com/apache/spark/pull/23201/files#diff-e925de14239f40430d05f9ffd0360f10R130), right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23207 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23204: Revert "[SPARK-21052][SQL] Add hash map metrics to join"
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23204 according to https://github.com/apache/spark/pull/23214#issuecomment-443999282 , the hash join metrics is wrongly implemented. I think it's fine to revert it and re-implement it later. @JkSelf can you address the comments and only revert the hash join part? thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23249: [SPARK-26297][SQL] improve the doc of Distribution/Parti...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23249 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Extract Python UDFs at the end...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r239738437 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala --- @@ -31,7 +31,8 @@ class SparkOptimizer( override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+ Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ -Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+ +Batch("Extract Python UDFs", Once, + Seq(ExtractPythonUDFFromAggregate, ExtractPythonUDFs): _*) :+ --- End diff -- but we already have `ExtractPythonUDFFromAggregate` here... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239736660 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -78,6 +80,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NORMALIZE_TIMING_METRIC = "normalizeTiming" --- End diff -- `private val NS_TIMING_METRIC = "nsTiming"` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239735814 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -78,6 +80,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NORMALIZE_TIMING_METRIC = "normalizeTiming" --- End diff -- Actually I think your previous naming is good, sorry for the back and forth. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239735425 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -78,6 +80,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NORMALIZE_TIMING_METRIC = "normalizeTiming" --- End diff -- maybe `nsToMsTiming`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239735015 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] class ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a + * per-row operator, here need a careful consideration on performance. + */ + def createMetricsReporter(context: TaskContext): ShuffleWriteMetricsReporter = { --- End diff -- this can be protected? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239734920 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] class ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the task context, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a --- End diff -- `always return a proxy reporter for both local accumulator and original reporter updating` is it stale? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r239733875 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,116 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` used to write data when the table exists. + def writingCommandForExistingTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + + // Returns `DataWritingCommand` used to write data when the table doesn't exist. + def writingCommandForNewTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +InsertIntoHiveTable( + tableDesc, + Map.empty, --- End diff -- or open a new PR to allow CTAS for partitioned hive table first? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23239: [SPARK-26021][SQL][followup] only deal with NaN and -0.0...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23239 I checked the original PR that handles NaN: https://github.com/apache/spark/commit/c032b0bf92130dc4facb003f0deaeb1228aefded It didn't add end-to-end tests, so I added 2 new tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23239: [SPARK-26021][SQL][followup] only deal with NaN and -0.0...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23239 Yes it is. `UnsafeProjection` always normalize NaN and -0.0, and Spark uses `UnsafeProjection` to produce output. So users can't distinguish them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239690226 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, IntegerType} /** * Specifies how tuples that share common expressions will be distributed when a query is executed - * in parallel on many machines. Distribution can be used to refer to two distinct physical - * properties: - * - Inter-node partitioning of data: In this case the distribution describes how tuples are - *partitioned across physical machines in a cluster. Knowing this property allows some - *operators (e.g., Aggregate) to perform partition local operations instead of global ones. - * - Intra-partition ordering of data: In this case the distribution describes guarantees made - *about how tuples are distributed within a single partition. + * in parallel on many machines. + * + * Distribution here refers to inter-node partitioning of data: + * The distribution describes how tuples are partitioned across physical machines in a cluster. + * Knowing this property allows some operators (e.g., Aggregate) to perform partition local + * operations instead of global ones. */ --- End diff -- for ordering, I think people can look at `OrderedDistribution`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23201: [SPARK-26246][SQL] Infer date and timestamp types...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23201#discussion_r239687264 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -121,7 +122,26 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { DecimalType(bigDecimal.precision, bigDecimal.scale) } decimalTry.getOrElse(StringType) - case VALUE_STRING => StringType + case VALUE_STRING => +val stringValue = parser.getText --- End diff -- or the order doesn't matter? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23201: [SPARK-26246][SQL] Infer date and timestamp types...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23201#discussion_r239687213 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -121,7 +122,26 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { DecimalType(bigDecimal.precision, bigDecimal.scale) } decimalTry.getOrElse(StringType) - case VALUE_STRING => StringType + case VALUE_STRING => +val stringValue = parser.getText --- End diff -- I checked `PartitioningUtils.inferPartitionColumnValue`, we try timestamp first and then date. Shall we follow it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r239686156 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { expressions.flatMap(collectEvaluableUDFs) } - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case plan: LogicalPlan => extract(plan) + def apply(plan: LogicalPlan): LogicalPlan = plan match { +// SPARK-26293: A subquery will be rewritten into join later, and will go through this rule +// eventually. Here we skip subquery, as Python UDF only needs to be extracted once. +case _: Subquery => plan --- End diff -- I agree it's a bit confusing, but that's how `Subquery` is designed to work. See how `RemoveRedundantAliases` catches `Subquery`. It's sufficient to make `ExtractPythonUDFs` idempotent, skip `Subquery` is just for double safe, and may have a little bit perf improvement, since this rule will be run less. In general, I think we should skip `Subquery` here. This is why we create `Subquery`: we expect rules that don't want to be executed on subquery to skip it. I'll check more rules and see if they need to skip `Subquery` later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23215: [SPARK-26263][SQL] Validate partition values with user p...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23215 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23239: [SPARK-26021][SQL][followup] only deal with NaN and -0.0...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23239 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239684697 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, IntegerType} /** * Specifies how tuples that share common expressions will be distributed when a query is executed - * in parallel on many machines. Distribution can be used to refer to two distinct physical - * properties: - * - Inter-node partitioning of data: In this case the distribution describes how tuples are - *partitioned across physical machines in a cluster. Knowing this property allows some - *operators (e.g., Aggregate) to perform partition local operations instead of global ones. - * - Intra-partition ordering of data: In this case the distribution describes guarantees made - *about how tuples are distributed within a single partition. + * in parallel on many machines. + * + * Distribution here refers to inter-node partitioning of data: + * The distribution describes how tuples are partitioned across physical machines in a cluster. + * Knowing this property allows some operators (e.g., Aggregate) to perform partition local + * operations instead of global ones. */ --- End diff -- I intentionally remove everything about intra-partition, as we never leverage it and no partitioning provides this property. Did I miss something? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239684490 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java --- @@ -25,14 +25,14 @@ import org.apache.spark.sql.types.StructType; /** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * A mix-in interface for {@link Table}. Data sources can implement this interface to * provide data writing ability for batch processing. * * This interface is used to create {@link BatchWriteSupport} instances when end users run * {@code Dataset.write.format(...).option(...).save()}. */ @Evolving -public interface BatchWriteSupportProvider extends DataSourceV2 { +public interface SupportsBatchWrite extends Table { --- End diff -- I do think read-only or write-only is a necessary feature, according to what I've seen in the dev list. Maybe we should move `newScanBuilder` from `Table` to the mixin traits. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23208 @rdblue I tried to add `WriteBuilder`, but there is a difference between read and write: 1. for read, the `ScanBuilder` can collect many information, like column pruning, filter pushdown, etc. together, and create a `Scan` 2. for write, it's just different branches, not a combination. e.g. you can't do append and replaceWhere at the same time. Because of this, I feel we don't need `WriterBuilder`, but just different mixin traits to create `Write` for different purposes. Let me know if you have other ideas. Thanks for your review! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239683592 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,52 +17,49 @@ package org.apache.spark.sql.execution.datasources.v2 -import java.util.UUID - -import scala.collection.JavaConverters._ +import java.util.{Optional, UUID} import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport import org.apache.spark.sql.types.StructType /** - * A logical plan representing a data source v2 scan. + * A logical plan representing a data source v2 table. * - * @param source An instance of a [[DataSourceV2]] implementation. - * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]]. - * @param userSpecifiedSchema The user-specified schema for this scan. + * @param table The table that this relation represents. + * @param options The options for this table operation. It's used to create fresh [[ScanBuilder]] + *and [[BatchWriteSupport]]. */ case class DataSourceV2Relation( -// TODO: remove `source` when we finish API refactor for write. -source: TableProvider, -table: SupportsBatchRead, +table: Table, output: Seq[AttributeReference], -options: Map[String, String], -userSpecifiedSchema: Option[StructType] = None) +// TODO: use a simple case insensitive map instead. +options: DataSourceOptions) --- End diff -- Because this makes the code cleaner, otherwise I need to write more code to convert a map to `DataSourceOptions` multiple times inside `DataSourceV2Relation`. I don't have a strong preference here, and just pick the easiest approach for me. If you do think using a map here is clearer, I can add these extra code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239682984 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotBucketed("save") -val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) -if (classOf[DataSourceV2].isAssignableFrom(cls)) { - val source = cls.getConstructor().newInstance().asInstanceOf[DataSourceV2] - source match { -case provider: BatchWriteSupportProvider => - val sessionOptions = DataSourceV2Utils.extractSessionConfigs( -source, -df.sparkSession.sessionState.conf) - val options = sessionOptions ++ extraOptions - +val session = df.sparkSession +val cls = DataSource.lookupDataSource(source, session.sessionState.conf) +if (classOf[TableProvider].isAssignableFrom(cls)) { + val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( +provider, session.sessionState.conf) + val options = sessionOptions ++ extraOptions + val dsOptions = new DataSourceOptions(options.asJava) + provider.getTable(dsOptions) match { +case table: SupportsBatchWrite => + val relation = DataSourceV2Relation.create(table, dsOptions) + // TODO: revisit it. We should not create the `AppendData` operator for `SaveMode.Append`. + // We should create new end-users APIs for the `AppendData` operator. --- End diff -- yea, that's why I only left a comment and just ask for revisiting later. I think we can see a clearer picture after we migrating the file source. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239682239 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java --- @@ -25,7 +25,10 @@ * The base interface for v2 data sources which don't have a real catalog. Implementations must * have a public, 0-arg constructor. * - * The major responsibility of this interface is to return a {@link Table} for read/write. + * The major responsibility of this interface is to return a {@link Table} for read/write. If you + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter` + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The + * table schema can be empty in this case. --- End diff -- I don't want to break existing use cases, file sources can overwrite/append to a non-existing location, and we still need to support that with `SaveMode`. Whatever the new write API will be, I think we still need to support `SaveMode` for a while. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23207 the code looks much cleaner now! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239677846 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -78,6 +80,7 @@ object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val NS_TIMING_METRIC = "nanosecond" --- End diff -- Can we change it to `ms`? The core side can still be `ns`, but in SQL side we truncate it into `ms`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239677653 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -333,8 +343,19 @@ object ShuffleExchangeExec { new ShuffleDependency[Int, InternalRow, InternalRow]( rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), -serializer) +serializer, +shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics)) dependency } + + /** + * Create a customized [[ShuffleWriteProcessor]] for SQL which wrap the default metrics reporter + * with [[SQLShuffleWriteMetricsReporter]] as new reporter for [[ShuffleWriteProcessor]]. + */ + def createShuffleWriteProcessor(metrics: Map[String, SQLMetric]): ShuffleWriteProcessor = { +(reporter: ShuffleWriteMetricsReporter) => { --- End diff -- does this work with Scala 2.11? maybe we don't need to be that fancy and just write ``` new ShuffleWriteProcessor { xxx } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239677477 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] trait ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a + * per-row operator, here need a careful consideration on performance. + */ + def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): ShuffleWriteMetricsReporter --- End diff -- after it, we can just make `ShuffleWriteProcessor` a class --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239677325 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriterProcessor.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.{Partition, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.MapStatus + +/** + * The interface for customizing shuffle write process. The driver create a ShuffleWriteProcessor + * and put it into [[ShuffleDependency]], and executors use it in each ShuffleMapTask. + */ +private[spark] trait ShuffleWriteProcessor extends Serializable with Logging { + + /** + * Create a [[ShuffleWriteMetricsReporter]] from the default reporter, always return a proxy + * reporter for both local accumulator and original reporter updating. As the reporter is a + * per-row operator, here need a careful consideration on performance. + */ + def createMetricsReporter(reporter: ShuffleWriteMetricsReporter): ShuffleWriteMetricsReporter --- End diff -- how about `def createMetricsReporter(context: TaskContext)`? Then in core it's implemented as ``` context.taskMetrics().shuffleWriteMetrics ``` and in SQL ``` new SQLShuffle.Reporter(context.taskMetrics().shuffleWriteMetrics) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23244: [SPARK-26289][CORE]cleanup enablePerfMetrics parameter f...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23244 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23244: [SPARK-26289][CORE]cleanup enablePerfMetrics para...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23244#discussion_r239675382 --- Diff: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java --- @@ -209,23 +205,14 @@ public BytesToBytesMap( TaskMemoryManager taskMemoryManager, --- End diff -- OK all tests, then we are fine --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23201: [SPARK-26246][SQL] Infer date and timestamp types...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23201#discussion_r239539848 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -121,7 +122,26 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { DecimalType(bigDecimal.precision, bigDecimal.scale) } decimalTry.getOrElse(StringType) - case VALUE_STRING => StringType + case VALUE_STRING => +val stringValue = parser.getText --- End diff -- sure. How many text data sources already support it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23201: [SPARK-26246][SQL] Infer date and timestamp types...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23201#discussion_r239534668 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -121,7 +122,26 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { DecimalType(bigDecimal.precision, bigDecimal.scale) } decimalTry.getOrElse(StringType) - case VALUE_STRING => StringType + case VALUE_STRING => +val stringValue = parser.getText --- End diff -- shall we abstract out this logic for all the text sources? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239508488 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -118,10 +116,13 @@ case class HashClusteredDistribution( /** * Represents data where tuples have been ordered according to the `ordering` - * [[Expression Expressions]]. This is a strictly stronger guarantee than - * [[ClusteredDistribution]] as an ordering will ensure that tuples that share the - * same value for the ordering expressions are contiguous and will never be split across - * partitions. + * [[Expression Expressions]]. + * + * Tuples that share the same values for the ordering expressions must be contiguous within a + * partition. They can also across partitions, but these partitions must be contiguous. For example, + * if value `v` is the biggest values in partition 3, it can also be in partition 4 as the smallest + * value. If all the values in partition 4 are `v`, it can also be in partition 5 as the smallest + * value. */ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { --- End diff -- This is only used by sort, and sort doesn't require rows of same value to be colocated in the same partition. Actually we already use this knowledge to optimize `RangePartitioning.satisfy` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239508437 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -118,10 +116,13 @@ case class HashClusteredDistribution( /** * Represents data where tuples have been ordered according to the `ordering` - * [[Expression Expressions]]. This is a strictly stronger guarantee than - * [[ClusteredDistribution]] as an ordering will ensure that tuples that share the - * same value for the ordering expressions are contiguous and will never be split across - * partitions. + * [[Expression Expressions]]. + * + * Tuples that share the same values for the ordering expressions must be contiguous within a + * partition. They can also across partitions, but these partitions must be contiguous. For example, + * if value `v` is the biggest values in partition 3, it can also be in partition 4 as the smallest + * value. If all the values in partition 4 are `v`, it can also be in partition 5 as the smallest + * value. */ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { --- End diff -- This is only used by sort, and sort doesn't require rows of same value to be colocated in the same partition. Actually we already use this knowledge to optimize `RangePartitioning.satisfy` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23239: [SPARK-26021][SQL][followup] only deal with NaN a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23239#discussion_r239507673 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java --- @@ -198,11 +198,45 @@ protected final void writeLong(long offset, long value) { Platform.putLong(getBuffer(), offset, value); } + // We need to take care of NaN and -0.0 in several places: + // 1. When compare values, different NaNs should be treated as same, `-0.0` and `0.0` should be + // treated as same. + // 2. In range partitioner, different NaNs should belong to the same partition, -0.0 and 0.0 --- End diff -- It turns out this is not a problem. The doc of `RangePartitioning` is misleading. I'm updating the doc at https://github.com/apache/spark/pull/23249 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23249: [SPARK-26297][SQL] improve the doc of Distribution/Parti...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23249 cc @maryannxue @hvanhovell @gatorsmile @viirya --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23249 [SPARK-26297][SQL] improve the doc of Distribution/Partitioning ## What changes were proposed in this pull request? Some documents of `Distribution/Partitioning` are stale and misleading, this PR fixes them: 1. `ClusteredDistribution` doesn't have intra-partition requirement 2. `OrderedDistribution` does not require tuples that share the same value being colocated in the same partition. 3. `RangePartitioning` can provide a weaker guarantee for a prefix of its `ordering` expressions. ## How was this patch tested? comment-only PR. You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23249.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23249 commit 24ea28abd5a385351703335df33b26838d203fe3 Author: Wenchen Fan Date: 2018-12-06T15:47:23Z improve the doc of Distribution/Partitioning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23248: [SPARK-26293][SQL] Cast exception when having python udf...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23248 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239469368 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java --- @@ -25,14 +25,14 @@ import org.apache.spark.sql.types.StructType; /** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * A mix-in interface for {@link Table}. Data sources can implement this interface to * provide data writing ability for batch processing. * * This interface is used to create {@link BatchWriteSupport} instances when end users run * {@code Dataset.write.format(...).option(...).save()}. */ @Evolving -public interface BatchWriteSupportProvider extends DataSourceV2 { +public interface SupportsBatchWrite extends Table { --- End diff -- That's why I left https://github.com/apache/spark/pull/23208#discussion_r238524973 . namings are welcome! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23215: [SPARK-26263][SQL] Validate partition values with...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23215#discussion_r239453312 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1396,6 +1396,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val VALIDATE_PARTITION_COLUMNS = +buildConf("spark.sql.sources.validatePartitionColumns") + .internal() + .doc("When this option is set to true, partition column values will be validated with " + +"provided schema. If the validation fails, a runtime exception is thrown." + +"When this option is set to false, the partition column value will be converted to null " + +"if it can not be converted to corresponding provided schema.") --- End diff -- `... can not be casted to ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23215: [SPARK-26263][SQL] Validate partition values with...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23215#discussion_r239453026 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala --- @@ -95,6 +95,31 @@ class FileIndexSuite extends SharedSQLContext { } } + test("SPARK-26263: Throw exception when partition value can't be converted to specific type") { --- End diff -- `can't be casted to user-specified type` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23213 these 3 combinations LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23244: [SPARK-26289][CORE]cleanup enablePerfMetrics para...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23244#discussion_r239451274 --- Diff: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java --- @@ -209,23 +205,14 @@ public BytesToBytesMap( TaskMemoryManager taskMemoryManager, --- End diff -- If this constructor is called, the `enablePerfMetrics` will be false. Where do we use this constructor? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23248: [SPARK-26293][SQL] Cast exception when having python udf...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23248 cc @icexelloss @HyukjinKwon @ueshin @viirya @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r239430315 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -60,8 +60,12 @@ private class BatchIterator[T](iter: Iterator[T], batchSize: Int) /** * A logical plan that evaluates a [[PythonUDF]]. */ -case class ArrowEvalPython(udfs: Seq[PythonUDF], output: Seq[Attribute], child: LogicalPlan) - extends UnaryNode +case class ArrowEvalPython( +udfs: Seq[PythonUDF], +output: Seq[Attribute], +child: LogicalPlan) extends UnaryNode { + override def producedAttributes: AttributeSet = AttributeSet(output.drop(child.output.length)) --- End diff -- a different but related fix, to make the `missingAttributes` calculated correctly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r239430084 --- Diff: python/pyspark/sql/tests/test_udf.py --- @@ -23,7 +23,7 @@ from pyspark import SparkContext from pyspark.sql import SparkSession, Column, Row -from pyspark.sql.functions import UserDefinedFunction +from pyspark.sql.functions import UserDefinedFunction, udf --- End diff -- add the import here, as a lof of tests use it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23248 [SPARK-26293][SQL] Cast exception when having python udf in subquery ## What changes were proposed in this pull request? This is a regression introduced by https://github.com/apache/spark/pull/22104 at Spark 2.4.0. When we have Python UDF in subquery, we will hit an exception ``` Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.AttributeReference cannot be cast to org.apache.spark.sql.catalyst.expressions.PythonUDF at scala.collection.immutable.Stream.map(Stream.scala:414) at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:98) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:815) ... ``` https://github.com/apache/spark/pull/22104 turned `ExtractPythonUDFs` from a physical rule to optimizer rule. However, there is a difference between a physical rule and optimizer rule. A physical rule always runs once, an optimizer rule may be applied twice on a query tree even the rule is located in a batch that only runs once. For a subquery, the `OptimizeSubqueries` rule will execute the entire optimizer on the query plan inside subquery. Later on subquery will be turned to joins, and the optimizer rules will be applied to it again. Unfortunately, the `ExtractPythonUDFs` rule is not idempotent. When it's applied twice on a query plan inside subquery, it will produce a malformed plan. It extracts Python UDF from Python exec plans. This PR proposes 2 changes to be double safe: 1. `ExtractPythonUDFs` should skip python exec plans, to make the rule idempotent 2. `ExtractPythonUDFs` should skip subquery ## How was this patch tested? a new test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark python Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23248.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23248 commit 9477fb09b850b981862cb72b0ebdebc5b404a082 Author: Wenchen Fan Date: 2018-12-06T11:16:04Z python udf in subquery --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org