[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r202093665 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { val nameToType = getFieldMap(schema) +def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match { + case decimal: JBigDecimal => +decimal.scale == decimalMeta.getScale + case _ => false +} + +// Since SPARK-24716, ParquetFilter accepts parquet file schema to convert to +// data source Filter. This must make sure that filter value matched the Filter. +// If doesn't matched, then the schema used to read the file is incorrect, +// which would cause data corruption. +def valueCanMakeFilterOn(name: String, value: Any): Boolean = { + value == null || (nameToType(name) match { +case ParquetBooleanType => value.isInstanceOf[JBoolean] +case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number] +case ParquetLongType => value.isInstanceOf[JLong] +case ParquetFloatType => value.isInstanceOf[JFloat] +case ParquetDoubleType => value.isInstanceOf[JDouble] +case ParquetStringType => value.isInstanceOf[String] +case ParquetBinaryType => value.isInstanceOf[Array[Byte]] +case ParquetDateType => value.isInstanceOf[Date] --- End diff -- Not in this PR that adds Decimal support. We should consider it in the future, though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r202093508 --- Diff: sql/core/benchmarks/FilterPushdownBenchmark-results.txt --- @@ -292,120 +292,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz Select 1 decimal(9, 2) row (value = 7864320): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative -Parquet Vectorized3785 / 3867 4.2 240.6 1.0X -Parquet Vectorized (Pushdown) 3820 / 3928 4.1 242.9 1.0X -Native ORC Vectorized 3981 / 4049 4.0 253.1 1.0X -Native ORC Vectorized (Pushdown) 702 / 735 22.4 44.6 5.4X +Parquet Vectorized4407 / 4852 3.6 280.2 1.0X +Parquet Vectorized (Pushdown) 1602 / 1634 9.8 101.8 2.8X --- End diff -- I'm not sure I understand. That's less than 2^24, so it should fit in an int. It should also fit in 8 base-ten digits so decimal(9,2) should work. And last, if the values don't fit in an int, I'm not sure how we would be able to store them in the first place, regardless of how stats are handled. Did you verify that there are no stats for the file produced here? If that's the case, it would make sense with these numbers. I think we just need to look for a different reason why stats are missing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r202090024 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -248,29 +371,29 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: // Probably I missed something and obviously this should be changed. predicate match { - case sources.IsNull(name) if canMakeFilterOn(name) => + case sources.IsNull(name) if canMakeFilterOn(name, null) => makeEq.lift(nameToType(name)).map(_(name, null)) - case sources.IsNotNull(name) if canMakeFilterOn(name) => + case sources.IsNotNull(name) if canMakeFilterOn(name, null) => makeNotEq.lift(nameToType(name)).map(_(name, null)) - case sources.EqualTo(name, value) if canMakeFilterOn(name) => + case sources.EqualTo(name, value) if canMakeFilterOn(name, value) => makeEq.lift(nameToType(name)).map(_(name, value)) - case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name) => + case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, value) => makeNotEq.lift(nameToType(name)).map(_(name, value)) - case sources.EqualNullSafe(name, value) if canMakeFilterOn(name) => + case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) => makeEq.lift(nameToType(name)).map(_(name, value)) - case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name) => + case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value) => makeNotEq.lift(nameToType(name)).map(_(name, value)) --- End diff -- Maybe I'm missing something, but that returns true for all null values. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21118 @cloud-fan, I'd like to get this PR in by 2.4.0. Now that the change to push predicates and projections happens when converting to the physical plan, this can go in. I've rebased this on master and updated it. This changes the DSv2 API to primarily use InternalRow. It ensures that the rows are UnsafeRow by adding a projection on top of the physical scan node. This projection is actually *more* efficient than the current read path because filters are run before the projection. This means, for example, that the Parquet reader can avoid those two projections that currently happen in the scan node. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r201763831 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -248,29 +371,29 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: // Probably I missed something and obviously this should be changed. predicate match { - case sources.IsNull(name) if canMakeFilterOn(name) => + case sources.IsNull(name) if canMakeFilterOn(name, null) => makeEq.lift(nameToType(name)).map(_(name, null)) - case sources.IsNotNull(name) if canMakeFilterOn(name) => + case sources.IsNotNull(name) if canMakeFilterOn(name, null) => makeNotEq.lift(nameToType(name)).map(_(name, null)) - case sources.EqualTo(name, value) if canMakeFilterOn(name) => + case sources.EqualTo(name, value) if canMakeFilterOn(name, value) => makeEq.lift(nameToType(name)).map(_(name, value)) - case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name) => + case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, value) => makeNotEq.lift(nameToType(name)).map(_(name, value)) - case sources.EqualNullSafe(name, value) if canMakeFilterOn(name) => + case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) => makeEq.lift(nameToType(name)).map(_(name, value)) - case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name) => + case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value) => makeNotEq.lift(nameToType(name)).map(_(name, value)) --- End diff -- Since makeNotEq is also used for EqualNullSafe, I think it should handle null values as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r201763489 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -37,41 +39,64 @@ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ -private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { +private[parquet] class ParquetFilters( +pushDownDate: Boolean, +pushDownDecimal: Boolean, +pushDownStartWith: Boolean) { private case class ParquetSchemaType( originalType: OriginalType, primitiveTypeName: PrimitiveTypeName, - decimalMetadata: DecimalMetadata) - - private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null) - private val ParquetByteType = ParquetSchemaType(INT_8, INT32, null) - private val ParquetShortType = ParquetSchemaType(INT_16, INT32, null) - private val ParquetIntegerType = ParquetSchemaType(null, INT32, null) - private val ParquetLongType = ParquetSchemaType(null, INT64, null) - private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null) - private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null) - private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null) - private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null) - private val ParquetDateType = ParquetSchemaType(DATE, INT32, null) + length: Int, + decimalMeta: DecimalMetadata) + + private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0, null) + private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null) + private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null) + private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null) + private val ParquetLongType = ParquetSchemaType(null, INT64, 0, null) + private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0, null) + private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0, null) + private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, 0, null) + private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0, null) + private val ParquetDateType = ParquetSchemaType(DATE, INT32, 0, null) private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } + private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue() + + private def decimalToInt64(decimal: JBigDecimal): JLong = decimal.unscaledValue().longValue() + + private def decimalToByteArray(decimal: JBigDecimal, numBytes: Int): Binary = { +val decimalBuffer = new Array[Byte](numBytes) +val bytes = decimal.unscaledValue().toByteArray + +val fixedLengthBytes = if (bytes.length == numBytes) { + bytes +} else { + val signByte = if (bytes.head < 0) -1: Byte else 0: Byte + java.util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte) + System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, bytes.length) + decimalBuffer +} +Binary.fromReusedByteArray(fixedLengthBytes, 0, numBytes) + } + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { --- End diff -- Since `makeEq` is called for `EqualsNullSafe` and `valueCanMakeFilterOn` allows null values through, I think these could be null, like the String case. I think this should use the `Option` pattern from String for all values, unless I'm missing some reason why these will never be null. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r201761849 --- Diff: sql/core/benchmarks/FilterPushdownBenchmark-results.txt --- @@ -292,120 +292,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz Select 1 decimal(9, 2) row (value = 7864320): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative -Parquet Vectorized3785 / 3867 4.2 240.6 1.0X -Parquet Vectorized (Pushdown) 3820 / 3928 4.1 242.9 1.0X -Native ORC Vectorized 3981 / 4049 4.0 253.1 1.0X -Native ORC Vectorized (Pushdown) 702 / 735 22.4 44.6 5.4X +Parquet Vectorized4407 / 4852 3.6 280.2 1.0X +Parquet Vectorized (Pushdown) 1602 / 1634 9.8 101.8 2.8X --- End diff -- Maybe it is that the data is more dense, so we need to read more values in the row group that contains the one we're looking for? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21305 I don't think we need (or want) `SaveMode` passed to writers after standardizing. Uses of `WriteSupport` will always append data to an existing table, which makes it simpler for writers. And it will be used for all writes. A couple other notes: * We will also need a `StagedTable` variant of `DeleteSupport` to support `ReplaceData` as an atomic operation, but I want to get the non-atomic variants in first; hopefully for 2.4.0. * RTAS would use `dropTable` and not `DeleteSupport`, since it is at the table level. * We may still use `SaveMode` in the DF writer API, which is still under discussion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r201756667 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -37,41 +39,64 @@ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ -private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { +private[parquet] class ParquetFilters( +pushDownDate: Boolean, +pushDownDecimal: Boolean, +pushDownStartWith: Boolean) { private case class ParquetSchemaType( originalType: OriginalType, primitiveTypeName: PrimitiveTypeName, - decimalMetadata: DecimalMetadata) - - private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null) - private val ParquetByteType = ParquetSchemaType(INT_8, INT32, null) - private val ParquetShortType = ParquetSchemaType(INT_16, INT32, null) - private val ParquetIntegerType = ParquetSchemaType(null, INT32, null) - private val ParquetLongType = ParquetSchemaType(null, INT64, null) - private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null) - private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null) - private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null) - private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null) - private val ParquetDateType = ParquetSchemaType(DATE, INT32, null) + length: Int, + decimalMeta: DecimalMetadata) + + private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0, null) + private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null) + private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null) + private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null) + private val ParquetLongType = ParquetSchemaType(null, INT64, 0, null) + private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0, null) + private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0, null) + private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, 0, null) + private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0, null) + private val ParquetDateType = ParquetSchemaType(DATE, INT32, 0, null) private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } + private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue() + + private def decimalToInt64(decimal: JBigDecimal): JLong = decimal.unscaledValue().longValue() + + private def decimalToByteArray(decimal: JBigDecimal, numBytes: Int): Binary = { +val decimalBuffer = new Array[Byte](numBytes) +val bytes = decimal.unscaledValue().toByteArray + +val fixedLengthBytes = if (bytes.length == numBytes) { + bytes +} else { + val signByte = if (bytes.head < 0) -1: Byte else 0: Byte + java.util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte) + System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, bytes.length) + decimalBuffer +} +Binary.fromReusedByteArray(fixedLengthBytes, 0, numBytes) --- End diff -- This byte array is not reused, it is allocated each time this function runs. This should use the `fromConstantByteArray` variant. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r201755545 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { val nameToType = getFieldMap(schema) +def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match { + case decimal: JBigDecimal => +decimal.scale == decimalMeta.getScale + case _ => false +} + +// Since SPARK-24716, ParquetFilter accepts parquet file schema to convert to +// data source Filter. This must make sure that filter value matched the Filter. +// If doesn't matched, then the schema used to read the file is incorrect, +// which would cause data corruption. +def valueCanMakeFilterOn(name: String, value: Any): Boolean = { + value == null || (nameToType(name) match { +case ParquetBooleanType => value.isInstanceOf[JBoolean] +case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number] +case ParquetLongType => value.isInstanceOf[JLong] +case ParquetFloatType => value.isInstanceOf[JFloat] +case ParquetDoubleType => value.isInstanceOf[JDouble] +case ParquetStringType => value.isInstanceOf[String] +case ParquetBinaryType => value.isInstanceOf[Array[Byte]] +case ParquetDateType => value.isInstanceOf[Date] --- End diff -- Why is there no support for timestamp? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r201755353 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { val nameToType = getFieldMap(schema) +def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match { + case decimal: JBigDecimal => +decimal.scale == decimalMeta.getScale + case _ => false +} + +// Since SPARK-24716, ParquetFilter accepts parquet file schema to convert to +// data source Filter. This must make sure that filter value matched the Filter. +// If doesn't matched, then the schema used to read the file is incorrect, +// which would cause data corruption. +def valueCanMakeFilterOn(name: String, value: Any): Boolean = { + value == null || (nameToType(name) match { +case ParquetBooleanType => value.isInstanceOf[JBoolean] +case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number] +case ParquetLongType => value.isInstanceOf[JLong] +case ParquetFloatType => value.isInstanceOf[JFloat] +case ParquetDoubleType => value.isInstanceOf[JDouble] +case ParquetStringType => value.isInstanceOf[String] +case ParquetBinaryType => value.isInstanceOf[Array[Byte]] +case ParquetDateType => value.isInstanceOf[Date] +case ParquetSchemaType(DECIMAL, INT32, 0, decimalMeta) => --- End diff -- Can the decimal cases be collapsed to a single case on `ParquetSchemaType(DECIMAL, _, _, decimalMetadata)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r201754882 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { val nameToType = getFieldMap(schema) +def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match { + case decimal: JBigDecimal => +decimal.scale == decimalMeta.getScale + case _ => false +} + +// Since SPARK-24716, ParquetFilter accepts parquet file schema to convert to --- End diff -- Is this issue reference correct? The PR says this is for SPARK-24549. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r201754998 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -202,6 +283,16 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: case ParquetDateType if pushDownDate => (n: String, v: Any) => FilterApi.gtEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer]) + +case ParquetSchemaType(DECIMAL, INT32, 0, _) if pushDownDecimal => --- End diff -- Why match 0 instead of _? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21305 @cloud-fan, yes. There is an open PR, #21308, that adds `DeleteSupport`. I'm not pushing for that just yet because I think `DeleteSupport` should be applied to `Table` after #21306 makes it in. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20933#discussion_r201427780 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -241,39 +240,47 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) if (classOf[DataSourceV2].isAssignableFrom(cls)) { val ds = cls.newInstance() - ds match { -case ws: WriteSupport => - val options = new DataSourceOptions((extraOptions ++ -DataSourceV2Utils.extractSessionConfigs( - ds = ds.asInstanceOf[DataSourceV2], - conf = df.sparkSession.sessionState.conf)).asJava) - // Using a timestamp and a random UUID to distinguish different writing jobs. This is good - // enough as there won't be tons of writing jobs created at the same second. - val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US) -.format(new Date()) + "-" + UUID.randomUUID() - val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options) - if (writer.isPresent) { -runCommand(df.sparkSession, "save") { - WriteToDataSourceV2(writer.get(), df.logicalPlan) -} - } + val (needToFallBackFileDataSourceV2, fallBackFileFormat) = ds match { +case f: FileDataSourceV2 => + val disabledV2Readers = + df.sparkSession.sessionState.conf.disabledV2FileDataSourceWriter.split(",") + (disabledV2Readers.contains(f.shortName), f.fallBackFileFormat.getCanonicalName) +case _ => (false, source) + } -// Streaming also uses the data source V2 API. So it may be that the data source implements -// v2, but has no v2 implementation for batch writes. In that case, we fall back to saving -// as though it's a V1 source. -case _ => saveToV1Source() + if (ds.isInstanceOf[WriteSupport] && !needToFallBackFileDataSourceV2) { +val options = new DataSourceOptions((extraOptions ++ + DataSourceV2Utils.extractSessionConfigs( +ds = ds.asInstanceOf[DataSourceV2], +conf = df.sparkSession.sessionState.conf)).asJava) +// Using a timestamp and a random UUID to distinguish different writing jobs. This is good +// enough as there won't be tons of writing jobs created at the same second. +val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US) + .format(new Date()) + "-" + UUID.randomUUID() +val writer = ds.asInstanceOf[WriteSupport] + .createWriter(jobId, df.logicalPlan.schema, mode, options) --- End diff -- It is. We're still evolving the v2 API and integration with Spark. This problem is addressed in PR #21305, which is the first of a series of changes to standardize the logical plans and fix problems like this one. There's also an [open proposal for those changes](https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r201399506 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -240,21 +238,27 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) if (classOf[DataSourceV2].isAssignableFrom(cls)) { - val ds = cls.newInstance() - ds match { + val source = cls.newInstance().asInstanceOf[DataSourceV2] + source match { case ws: WriteSupport => - val options = new DataSourceOptions((extraOptions ++ -DataSourceV2Utils.extractSessionConfigs( - ds = ds.asInstanceOf[DataSourceV2], - conf = df.sparkSession.sessionState.conf)).asJava) - // Using a timestamp and a random UUID to distinguish different writing jobs. This is good - // enough as there won't be tons of writing jobs created at the same second. - val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US) -.format(new Date()) + "-" + UUID.randomUUID() - val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options) - if (writer.isPresent) { + val options = extraOptions ++ + DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf) + + val relation = DataSourceV2Relation.create(source, options.toMap) + if (mode == SaveMode.Append) { runCommand(df.sparkSession, "save") { - WriteToDataSourceV2(writer.get(), df.logicalPlan) + AppendData.byName(relation, df.logicalPlan) +} + + } else { +val writer = ws.createWriter( + UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode, --- End diff -- I see no good reason to over-complicate the unique string passed in. Here's a quote from wikipedia on the chance of a conflict (from [this SO answer](https://stackoverflow.com/questions/24876188/how-big-is-the-chance-to-get-a-java-uuid-randomuuid-collision)): ``` Only after generating 1 billion UUIDs every second for the next 100 years, the probability of creating just one duplicate would be about 50%. Or, to put it another way, the probability of one duplicate would be about 50% if every person on earth owned 600 million UUIDs. ``` Adding timestamp to a UUID to avoid collisions is unnecessary. For the other use, why would a user go to the temp directory of some node's file system -- which may not even be used by a given source -- instead of going to the logs? What if the user wants any other piece of information besides the starting timestamp (that's in some format that has to be converted)? In short, I don't agree with the argument that it is helpful to pass the old format. This is just a carry-over from making fake Hadoop job IDs (why it was called `jobId` and started with `job_`). It's debatable whether the write UUID itself is even useful given that there is no requirement to use it anywhere. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21305 @cloud-fan, I think we can ignore that last test failure because tests are passing on the last commit that made real changes. The latest commit only changed a comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21305 Retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21305 @cloud-fan, I've updated this and the tests are passing, so I think it is ready for another look. I just pushed a comments-only commit to fix the Javadoc for AppendData that @viirya pointed out (thanks!). Since that's only comments, it shouldn't affect test results. I think there's just one more point under discussion, which is the change from `jobId` (which isn't one) to `jobUUID` and dropping the timestamp. I don't think a timestamp helps avoid conflicts because it is astronomically unlikely that UUIDs will collide. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r200825129 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -240,21 +238,27 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) if (classOf[DataSourceV2].isAssignableFrom(cls)) { - val ds = cls.newInstance() - ds match { + val source = cls.newInstance().asInstanceOf[DataSourceV2] + source match { case ws: WriteSupport => - val options = new DataSourceOptions((extraOptions ++ -DataSourceV2Utils.extractSessionConfigs( - ds = ds.asInstanceOf[DataSourceV2], - conf = df.sparkSession.sessionState.conf)).asJava) - // Using a timestamp and a random UUID to distinguish different writing jobs. This is good - // enough as there won't be tons of writing jobs created at the same second. - val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US) -.format(new Date()) + "-" + UUID.randomUUID() - val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options) - if (writer.isPresent) { + val options = extraOptions ++ + DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf) + + val relation = DataSourceV2Relation.create(source, options.toMap) + if (mode == SaveMode.Append) { runCommand(df.sparkSession, "save") { - WriteToDataSourceV2(writer.get(), df.logicalPlan) + AppendData.byName(relation, df.logicalPlan) +} + + } else { +val writer = ws.createWriter( + UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode, --- End diff -- How would random UUIDs conflict? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r200824906 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java --- @@ -38,15 +38,16 @@ * If this method fails (by throwing an exception), the action will fail and no Spark job will be * submitted. * - * @param jobId A unique string for the writing job. It's possible that there are many writing - * jobs running at the same time, and the returned {@link DataSourceWriter} can - * use this job id to distinguish itself from other jobs. + * @param writeUUID A unique string for the writing job. It's possible that there are many writing --- End diff -- This is not the ID of the Spark job that is writing. I think the UUID name is more clear about what is actually passed, a unique string that identifies the write. There's also no need to make the string more complicated than a UUID since there are no guarantees about it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r200824639 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2120,6 +2122,99 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ + object ResolveOutputRelation extends Rule[LogicalPlan] { +override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case append @ AppendData(table: NamedRelation, query, isByName) + if table.resolved && query.resolved && !append.resolved => +val projection = resolveOutputColumns(table.name, table.output, query, isByName) + +if (projection != query) { + append.copy(query = projection) +} else { + append +} +} + +def resolveOutputColumns( +tableName: String, +expected: Seq[Attribute], +query: LogicalPlan, +byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { +throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { +expected.flatMap { outAttr => + query.resolveQuoted(outAttr.name, resolver) match { +case Some(inAttr) if inAttr.nullable && !outAttr.nullable => + errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" + None + +case Some(inAttr) if !outAttr.dataType.sameType(inAttr.dataType) => + Some(upcast(inAttr, outAttr)) + +case Some(inAttr) => + Some(inAttr) // matches nullability, datatype, and name + +case _ => + errors += s"Cannot find data for output column '${outAttr.name}'" + None + } +} + + } else { +if (expected.size > query.output.size) { --- End diff -- That check is the other direction: not enough columns. When matching by position, we need to have the same number of columns so we add this check (we already know that there aren't too few columns, so this checks for too many). When matching by name, we can call out specific columns that are missing, which is why we do the validation differently for the two cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r200824599 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2120,6 +2122,99 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ + object ResolveOutputRelation extends Rule[LogicalPlan] { +override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case append @ AppendData(table: NamedRelation, query, isByName) + if table.resolved && query.resolved && !append.resolved => +val projection = resolveOutputColumns(table.name, table.output, query, isByName) + +if (projection != query) { + append.copy(query = projection) +} else { + append +} +} + +def resolveOutputColumns( +tableName: String, +expected: Seq[Attribute], +query: LogicalPlan, +byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { +throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { +expected.flatMap { outAttr => + query.resolveQuoted(outAttr.name, resolver) match { +case Some(inAttr) if inAttr.nullable && !outAttr.nullable => + errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" --- End diff -- I would much rather have a job fail fast and give a clear error message than to fail during a write. I can see how adding such an assertion to the plan could be useful, so I'd consider it if someone wanted to add that feature later. Right now, though, I think this is good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r200824602 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2120,6 +2122,99 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ + object ResolveOutputRelation extends Rule[LogicalPlan] { +override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case append @ AppendData(table: NamedRelation, query, isByName) + if table.resolved && query.resolved && !append.resolved => +val projection = resolveOutputColumns(table.name, table.output, query, isByName) + +if (projection != query) { + append.copy(query = projection) +} else { + append +} +} + +def resolveOutputColumns( +tableName: String, +expected: Seq[Attribute], +query: LogicalPlan, +byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { +throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { +expected.flatMap { outAttr => + query.resolveQuoted(outAttr.name, resolver) match { +case Some(inAttr) if inAttr.nullable && !outAttr.nullable => + errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" + None + +case Some(inAttr) if !outAttr.dataType.sameType(inAttr.dataType) => --- End diff -- Yes, I'll update to check nested fields. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r200824532 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2120,6 +2122,99 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ + object ResolveOutputRelation extends Rule[LogicalPlan] { +override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case append @ AppendData(table: NamedRelation, query, isByName) --- End diff -- Yes, I agree. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r200824504 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableCatalog.java --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.catalog; + +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public interface TableCatalog { + /** + * Load table metadata by {@link TableIdentifier identifier} from the catalog. + * + * @param ident a table identifier + * @return the table's metadata + * @throws NoSuchTableException If the table doesn't exist. + */ + Table loadTable(TableIdentifier ident) throws NoSuchTableException; + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + default Table createTable(TableIdentifier ident, +StructType schema) throws TableAlreadyExistsException { +return createTable(ident, schema, Collections.emptyList(), Collections.emptyMap()); + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + default Table createTable(TableIdentifier ident, +StructType schema, +Map properties) throws TableAlreadyExistsException { +return createTable(ident, schema, Collections.emptyList(), properties); + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param partitions a list of expressions to use for partitioning data in the table + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + Table createTable(TableIdentifier ident, +StructType schema, +List partitions, --- End diff -- I wouldn't say this way of passing partitioning is a new feature. It's just a generalization of the existing partitioning that allows us to pass any type of partition, whether it is bucketing or column-based. As for open discussion, this was proposed in the SPIP that was fairly widely read and commented on. That SPIP was posted to the dev list a few times, too. I do appreciate you wanting to make sure there's been a chance for the community to discuss it, but there has been plenty of opportunity to comment. At this point, I think it's reasonable to move forward with the implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21305 @cloud-fan, I've updated this with the requested changes. Thanks for looking at it! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r200711421 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -40,17 +44,24 @@ case class DataSourceV2Relation( source: DataSourceV2, output: Seq[AttributeReference], options: Map[String, String], -userSpecifiedSchema: Option[StructType]) - extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat { +tableIdent: Option[TableIdentifier] = None, +userSpecifiedSchema: Option[StructType] = None) + extends LeafNode with MultiInstanceRelation with NamedRelation with DataSourceV2StringFormat { import DataSourceV2Relation._ + override def name: String = { +tableIdent.map(_.unquotedString).getOrElse("unknown") --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r200710273 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableCatalog.java --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.catalog; + +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public interface TableCatalog { + /** + * Load table metadata by {@link TableIdentifier identifier} from the catalog. + * + * @param ident a table identifier + * @return the table's metadata + * @throws NoSuchTableException If the table doesn't exist. + */ + Table loadTable(TableIdentifier ident) throws NoSuchTableException; + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + default Table createTable(TableIdentifier ident, +StructType schema) throws TableAlreadyExistsException { +return createTable(ident, schema, Collections.emptyList(), Collections.emptyMap()); + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + default Table createTable(TableIdentifier ident, +StructType schema, +Map properties) throws TableAlreadyExistsException { +return createTable(ident, schema, Collections.emptyList(), properties); + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param partitions a list of expressions to use for partitioning data in the table + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + Table createTable(TableIdentifier ident, +StructType schema, +List partitions, --- End diff -- Another benefit: this would allow us to translate `BUCKETED BY` clauses into something we can actually pass to data sources. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r200709816 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableCatalog.java --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.catalog; + +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public interface TableCatalog { + /** + * Load table metadata by {@link TableIdentifier identifier} from the catalog. + * + * @param ident a table identifier + * @return the table's metadata + * @throws NoSuchTableException If the table doesn't exist. + */ + Table loadTable(TableIdentifier ident) throws NoSuchTableException; + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + default Table createTable(TableIdentifier ident, +StructType schema) throws TableAlreadyExistsException { +return createTable(ident, schema, Collections.emptyList(), Collections.emptyMap()); + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + default Table createTable(TableIdentifier ident, +StructType schema, +Map properties) throws TableAlreadyExistsException { +return createTable(ident, schema, Collections.emptyList(), properties); + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param partitions a list of expressions to use for partitioning data in the table + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + Table createTable(TableIdentifier ident, +StructType schema, +List partitions, --- End diff -- > The current end-user API only allows users to specify partition columns. I think an example would help understand the use of expression here. Right now, I can create a table partitioned by day like this: ``` CREATE TABLE t (ts timestamp, data string, day string) PARTITIONED BY (day) ``` Then it's up to queries to supply the right values for `day` in their queries. I'm proposing we change that to something like the following that uses expressions in the PARTITIONED BY clause instead of only allowing column names: ``` CREATE TABLE t (ts timestamp, data string) PARTITIONED BY (date(ts)); ``` This can handle all identity partitioning in Hive tables today and it can handle bucketing. > And why does the "partition transform" belong to a table definition? Transforms should be passed to the table so the source use them for the physical layout. In DataSourceV2, the source could be anything so it needs to be the component that handles
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r200428354 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -172,6 +173,7 @@ class Analyzer( ResolveWindowOrder :: ResolveWindowFrame :: ResolveNaturalAndUsingJoin :: + ResolveOutputRelation :: --- End diff -- This rule may add `Projection`, `UpCast`, and `Alias` nodes to the plan, so there are some rules in this batch that should be run after the output is resolved. `ResolveUpCast` will rewrite the casts that were inserted and throw exceptions if the cast would truncate and needs to run after this rule. I could also create a batch just after resolution for output resolution. We could just run this rule and `ResolveUpCast`. I think the optimizer will handle collapsing `Projection` nodes and aliases are only resolved in this batch, so adding resolved aliases shouldn't be a problem. Would you like a separate batch for output resolution? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r200423733 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -40,17 +44,24 @@ case class DataSourceV2Relation( source: DataSourceV2, output: Seq[AttributeReference], options: Map[String, String], -userSpecifiedSchema: Option[StructType]) - extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat { +tableIdent: Option[TableIdentifier] = None, +userSpecifiedSchema: Option[StructType] = None) + extends LeafNode with MultiInstanceRelation with NamedRelation with DataSourceV2StringFormat { import DataSourceV2Relation._ + override def name: String = { +tableIdent.map(_.unquotedString).getOrElse("unknown") --- End diff -- That's the name of the data source, not the name of the table. I'd be fine with updating this if you want to include the source name. What about `s"${source.name}:unknown"`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r200423206 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2120,6 +2122,99 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ + object ResolveOutputRelation extends Rule[LogicalPlan] { +override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case append @ AppendData(table: NamedRelation, query, isByName) + if table.resolved && query.resolved && !append.resolved => +val projection = resolveOutputColumns(table.name, table.output, query, isByName) + +if (projection != query) { + append.copy(query = projection) +} else { + append +} +} + +def resolveOutputColumns( +tableName: String, +expected: Seq[Attribute], +query: LogicalPlan, +byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { +throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { +expected.flatMap { outAttr => + query.resolveQuoted(outAttr.name, resolver) match { +case Some(inAttr) if inAttr.nullable && !outAttr.nullable => + errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" + None + +case Some(inAttr) if !outAttr.dataType.sameType(inAttr.dataType) => + Some(upcast(inAttr, outAttr)) + +case Some(inAttr) => + Some(inAttr) // matches nullability, datatype, and name + +case _ => + errors += s"Cannot find data for output column '${outAttr.name}'" + None + } +} + + } else { +if (expected.size > query.output.size) { + throw new AnalysisException( +s"""Cannot write to '$tableName', not enough data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) +} + +query.output.zip(expected).flatMap { + case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable => +errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" +None + + case (inAttr, outAttr) +if !inAttr.dataType.sameType(outAttr.dataType) || inAttr.name != outAttr.name => +Some(upcast(inAttr, outAttr)) + + case (inAttr, _) => +Some(inAttr) // matches nullability, datatype, and name +} + } + + if (errors.nonEmpty) { +throw new AnalysisException( + s"Cannot write incompatible data to table '$tableName':\n- ${errors.mkString("\n- ")}") + } + + Project(resolved, query) +} + +private def upcast(inAttr: NamedExpression, outAttr: Attribute): NamedExpression = { + Alias( +UpCast(inAttr, outAttr.dataType, Seq()), outAttr.name --- End diff -- The purpose of `UpCast` here is to prevent Spark from automatically inserting casts that could lose information, like `long` to `int` or `string` to `int`. I would support the same for `string` to `boolean` to catch destructive problems from accidental column alignment (in SQL) or similar errors. The main problem here is that Spark inserts casts instead of alerting the user that there's a problem. When the write succeeds, it may be a while before the user realizes the mistake and can't recover the original data. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r200421789 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala --- @@ -203,33 +203,33 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { val path = file.getCanonicalPath assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty) -spark.range(10).select('id, -'id).write.format(cls.getName) +spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName) --- End diff -- Yes. The new resolution rule validates the dataframe that will be written to the table. Because this uses the `DataFrameWriter` API, it matches columns by name because there isn't a strong expectation for ordering in the dataframe API (e.g. `withColumn` doesn't specify where the new column is added). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r200419939 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -82,6 +120,30 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) + +case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal => --- End diff -- That doesn't validate the value against the decimal scale from the file, which is what I'm suggesting. The decimal scale must match exactly and this is a good place to check because this has the file information. If the scale doesn't match, then the schema used to read this file is incorrect, which would cause data corruption. In my opinion, it is better to add a check if it is cheap instead of debating whether or not some other part of the code covers the case. If this were happening per record then I would opt for a different strategy, but because this is at the file level it is a good idea to add it here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r200418152 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableCatalog.java --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.catalog; + +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public interface TableCatalog { + /** + * Load table metadata by {@link TableIdentifier identifier} from the catalog. + * + * @param ident a table identifier + * @return the table's metadata + * @throws NoSuchTableException If the table doesn't exist. + */ + Table loadTable(TableIdentifier ident) throws NoSuchTableException; + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + default Table createTable(TableIdentifier ident, +StructType schema) throws TableAlreadyExistsException { +return createTable(ident, schema, Collections.emptyList(), Collections.emptyMap()); + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + default Table createTable(TableIdentifier ident, +StructType schema, +Map properties) throws TableAlreadyExistsException { +return createTable(ident, schema, Collections.emptyList(), properties); + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param partitions a list of expressions to use for partitioning data in the table + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + Table createTable(TableIdentifier ident, +StructType schema, +List partitions, --- End diff -- I recommend reading the proposal SPIP's "Proposed Changes" section, which goes into more detail than this comment can. In short, you're thinking of partitions as columns like Hive tables, but that is a narrow definition that prevents the underlying format from optimizing queries. Partitions of a table are derived from the column data through some transform. For example, partitioning by day uses a day transform from a timestamp column: `day(ts)`. Hive doesn't keep track of that transform and requires queries to handle it by inserting both `ts` and `day` columns. This leads to a few problems, including: * Hive has no ability to transform `ts > X` to the partition predicate `day >= day(X)`. Queries that don't take into account the table's physical storage by adding partition predicates by hand will result in full table scans. * Users can insert any data they choose into the `day` partition and it is up to them to do it correctly. Also, consider bucketing. Bucketing is
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21305 @cloud-fan, can you also review this PR for DataSourceV2? This adds the first of the logical plans proposed in [SPIP: Standardize Logical Plans](https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d): `AppendData`. It replaces the current logical node for batch writes and adds schema validation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r200181894 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -82,6 +120,30 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) + +case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal => --- End diff -- Since this uses the file schema, I think it should validate that the file uses the same scale as the value passed in. That's a cheap sanity check to ensure correctness. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r200181749 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -82,6 +120,30 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) + +case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal => + (n: String, v: Any) => FilterApi.eq( +intColumn(n), + Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().intValue() + .asInstanceOf[Integer]).orNull) +case ParquetSchemaType(DECIMAL, INT64, decimal) if pushDownDecimal => + (n: String, v: Any) => FilterApi.eq( +longColumn(n), + Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().longValue() + .asInstanceOf[java.lang.Long]).orNull) +// Legacy DecimalType +case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, decimal) if pushDownDecimal && --- End diff -- The binary used for the legacy type and for fixed-length storage should be the same, so I don't understand why there are two different conversion methods. Also, because this is using the Parquet schema now, there's no need to base the length of this binary on what older versions of Spark did -- in other words, if the underlying Parquet type is fixed, then just convert the decimal to that size fixed without worrying about legacy types. I think this should pass in the fixed array's length and convert the BigDecimal value to that length array for all cases. That works no matter what the file contains. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...
Github user rdblue closed the pull request at: https://github.com/apache/spark/pull/21306 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...
GitHub user rdblue reopened a pull request: https://github.com/apache/spark/pull/21306 [SPARK-24252][SQL] Add DataSourceV2 mix-in for catalog support. ## What changes were proposed in this pull request? This adds a mix-in to `DataSourceV2` that allows implementations to support catalog operations: load table (for schema), create table, drop table, and alter table. This does not include the proposed transactional API. ## How was this patch tested? Work in progress. This is for discussion right now. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-24252-add-datasource-v2-catalog Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21306.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21306 commit 915d72f8a3cf8df428ecdac25f30545d963ee5f7 Author: Ryan Blue Date: 2018-05-05T01:13:01Z SPARK-24252: Add v2 data source mix-in for catalog support. commit fb55395a657588d1f87b13dbb1793d13991dc2b0 Author: Ryan Blue Date: 2018-05-11T21:27:47Z SPARK-24252: Add copyright headers. commit 42ed4a4a138e5c5f681755d871fd9d9030a4619a Author: Ryan Blue Date: 2018-07-04T17:02:52Z SPARK-24252: Update for review comments. * Rename CatalogSupport to TableSupport * Rename DataSourceCatalog to TableCatalog * Remove name and database from Table commit 023995d15b4293fac1530da6bd966b6ab6823980 Author: Ryan Blue Date: 2018-07-04T17:19:45Z SPARK-24252: Add TableChange example to Javadocs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for catalog s...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21306 @cloud-fan, I've updated this to address your comments. Thanks for the reviews! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r200178463 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableChange.java --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.catalog; + +import org.apache.spark.sql.types.DataType; + +/** + * TableChange subclasses represent requested changes to a table. These are passed to + * {@link DataSourceCatalog#alterTable}. + */ +public interface TableChange { + + /** + * Create a TableChange for adding a top-level column to a table. + * + * Because "." may be interpreted as a field path separator or may be used in field names, it is + * not allowed in names passed to this method. To add to nested types or to add fields with + * names that contain ".", use {@link #addColumn(String, String, DataType)}. + * + * @param name the new top-level column name + * @param dataType the new column's data type + * @return a TableChange for the addition + */ + static TableChange addColumn(String name, DataType dataType) { +return new AddColumn(null, name, dataType); + } + + /** + * Create a TableChange for adding a nested column to a table. + * + * The parent name is used to find the parent struct type where the nested field will be added. + * If the parent name is null, the new column will be added to the root as a top-level column. + * If parent identifies a struct, a new column is added to that struct. If it identifies a list, + * the column is added to the list element struct, and if it identifies a map, the new column is + * added to the map's value struct. + * + * The given name is used to name the new column and names containing "." are not handled + * differently. + * + * @param parent the new field's parent + * @param name the new field name + * @param dataType the new field's data type + * @return a TableChange for the addition + */ + static TableChange addColumn(String parent, String name, DataType dataType) { +return new AddColumn(parent, name, dataType); + } + + /** + * Create a TableChange for renaming a field. + * + * The name is used to find the field to rename. The new name will replace the name of the type. + * For example, renameColumn("a.b.c", "x") should produce column a.b.x. --- End diff -- I added an example to the Javadocs: ```scala import TableChange._ val catalog = source.asInstanceOf[TableSupport].catalog() catalog.alterTable(ident, addColumn("x", IntegerType), renameColumn("a", "b"), deleteColumn("c") ) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r200177371 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableChange.java --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.catalog; + +import org.apache.spark.sql.types.DataType; + +/** + * TableChange subclasses represent requested changes to a table. These are passed to + * {@link DataSourceCatalog#alterTable}. + */ +public interface TableChange { + + /** + * Create a TableChange for adding a top-level column to a table. + * + * Because "." may be interpreted as a field path separator or may be used in field names, it is + * not allowed in names passed to this method. To add to nested types or to add fields with + * names that contain ".", use {@link #addColumn(String, String, DataType)}. + * + * @param name the new top-level column name + * @param dataType the new column's data type + * @return a TableChange for the addition + */ + static TableChange addColumn(String name, DataType dataType) { +return new AddColumn(null, name, dataType); + } + + /** + * Create a TableChange for adding a nested column to a table. + * + * The parent name is used to find the parent struct type where the nested field will be added. + * If the parent name is null, the new column will be added to the root as a top-level column. + * If parent identifies a struct, a new column is added to that struct. If it identifies a list, + * the column is added to the list element struct, and if it identifies a map, the new column is + * added to the map's value struct. + * + * The given name is used to name the new column and names containing "." are not handled + * differently. + * + * @param parent the new field's parent + * @param name the new field name + * @param dataType the new field's data type + * @return a TableChange for the addition + */ + static TableChange addColumn(String parent, String name, DataType dataType) { +return new AddColumn(parent, name, dataType); + } + + /** + * Create a TableChange for renaming a field. + * + * The name is used to find the field to rename. The new name will replace the name of the type. + * For example, renameColumn("a.b.c", "x") should produce column a.b.x. + * + * @param name the current field name + * @param newName the new name + * @return a TableChange for the rename + */ + static TableChange renameColumn(String name, String newName) { +return new RenameColumn(name, newName); + } + + /** + * Create a TableChange for updating the type of a field. + * + * The name is used to find the field to update. + * + * @param name the field name + * @param newDataType the new data type + * @return a TableChange for the update + */ + static TableChange updateColumn(String name, DataType newDataType) { +return new UpdateColumn(name, newDataType); + } + + /** + * Create a TableChange for deleting a field from a table. + * + * @param name the name of the field to delete + * @return a TableChange for the delete + */ + static TableChange deleteColumn(String name) { +return new DeleteColumn(name); + } + + final class AddColumn implements TableChange { --- End diff -- And, I'm not sure it's possible to implement unapply in Java. Not even implementing Product works. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r200174526 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableChange.java --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.catalog; + +import org.apache.spark.sql.types.DataType; + +/** + * TableChange subclasses represent requested changes to a table. These are passed to + * {@link DataSourceCatalog#alterTable}. + */ +public interface TableChange { + + /** + * Create a TableChange for adding a top-level column to a table. + * + * Because "." may be interpreted as a field path separator or may be used in field names, it is + * not allowed in names passed to this method. To add to nested types or to add fields with + * names that contain ".", use {@link #addColumn(String, String, DataType)}. + * + * @param name the new top-level column name + * @param dataType the new column's data type + * @return a TableChange for the addition + */ + static TableChange addColumn(String name, DataType dataType) { +return new AddColumn(null, name, dataType); + } + + /** + * Create a TableChange for adding a nested column to a table. + * + * The parent name is used to find the parent struct type where the nested field will be added. + * If the parent name is null, the new column will be added to the root as a top-level column. + * If parent identifies a struct, a new column is added to that struct. If it identifies a list, + * the column is added to the list element struct, and if it identifies a map, the new column is + * added to the map's value struct. + * + * The given name is used to name the new column and names containing "." are not handled + * differently. + * + * @param parent the new field's parent + * @param name the new field name + * @param dataType the new field's data type + * @return a TableChange for the addition + */ + static TableChange addColumn(String parent, String name, DataType dataType) { +return new AddColumn(parent, name, dataType); + } + + /** + * Create a TableChange for renaming a field. + * + * The name is used to find the field to rename. The new name will replace the name of the type. + * For example, renameColumn("a.b.c", "x") should produce column a.b.x. + * + * @param name the current field name + * @param newName the new name + * @return a TableChange for the rename + */ + static TableChange renameColumn(String name, String newName) { +return new RenameColumn(name, newName); + } + + /** + * Create a TableChange for updating the type of a field. + * + * The name is used to find the field to update. + * + * @param name the field name + * @param newDataType the new data type + * @return a TableChange for the update + */ + static TableChange updateColumn(String name, DataType newDataType) { +return new UpdateColumn(name, newDataType); + } + + /** + * Create a TableChange for deleting a field from a table. + * + * @param name the name of the field to delete + * @return a TableChange for the delete + */ + static TableChange deleteColumn(String name) { +return new DeleteColumn(name); + } + + final class AddColumn implements TableChange { --- End diff -- Nevermind, I forgot that these are in an interface so they are automatically public. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r200173138 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/Table.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.catalog; + +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.types.StructType; + +import java.util.List; +import java.util.Map; + +/** + * Represents table metadata from a {@link DataSourceCatalog}. + */ +public interface Table { --- End diff -- I'll just remove `name` and `database`. We can add them later when we figure out how we want to handle it. We need partitioning right away, though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r200171696 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableChange.java --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.catalog; + +import org.apache.spark.sql.types.DataType; + +/** + * TableChange subclasses represent requested changes to a table. These are passed to + * {@link DataSourceCatalog#alterTable}. + */ +public interface TableChange { + + /** + * Create a TableChange for adding a top-level column to a table. + * + * Because "." may be interpreted as a field path separator or may be used in field names, it is + * not allowed in names passed to this method. To add to nested types or to add fields with + * names that contain ".", use {@link #addColumn(String, String, DataType)}. + * + * @param name the new top-level column name + * @param dataType the new column's data type + * @return a TableChange for the addition + */ + static TableChange addColumn(String name, DataType dataType) { +return new AddColumn(null, name, dataType); + } + + /** + * Create a TableChange for adding a nested column to a table. + * + * The parent name is used to find the parent struct type where the nested field will be added. + * If the parent name is null, the new column will be added to the root as a top-level column. + * If parent identifies a struct, a new column is added to that struct. If it identifies a list, + * the column is added to the list element struct, and if it identifies a map, the new column is + * added to the map's value struct. + * + * The given name is used to name the new column and names containing "." are not handled + * differently. + * + * @param parent the new field's parent + * @param name the new field name + * @param dataType the new field's data type + * @return a TableChange for the addition + */ + static TableChange addColumn(String parent, String name, DataType dataType) { +return new AddColumn(parent, name, dataType); + } + + /** + * Create a TableChange for renaming a field. + * + * The name is used to find the field to rename. The new name will replace the name of the type. + * For example, renameColumn("a.b.c", "x") should produce column a.b.x. + * + * @param name the current field name + * @param newName the new name + * @return a TableChange for the rename + */ + static TableChange renameColumn(String name, String newName) { +return new RenameColumn(name, newName); + } + + /** + * Create a TableChange for updating the type of a field. + * + * The name is used to find the field to update. + * + * @param name the field name + * @param newDataType the new data type + * @return a TableChange for the update + */ + static TableChange updateColumn(String name, DataType newDataType) { +return new UpdateColumn(name, newDataType); + } + + /** + * Create a TableChange for deleting a field from a table. + * + * @param name the name of the field to delete + * @return a TableChange for the delete + */ + static TableChange deleteColumn(String name) { +return new DeleteColumn(name); + } + + final class AddColumn implements TableChange { --- End diff -- Just noticed that these aren't public, but should be because they will be passed to implementations through `alterTable`. These should also implement `unapply` for Scala implementations. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r200171424 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/Table.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.catalog; + +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.types.StructType; + +import java.util.List; +import java.util.Map; + +/** + * Represents table metadata from a {@link DataSourceCatalog}. + */ +public interface Table { --- End diff -- I updated the last comment because I thought this was referring to `CatalogSupport` at first. Sorry about the confusion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r200170560 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableChange.java --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.catalog; + +import org.apache.spark.sql.types.DataType; + +/** + * TableChange subclasses represent requested changes to a table. These are passed to + * {@link DataSourceCatalog#alterTable}. + */ +public interface TableChange { + + /** + * Create a TableChange for adding a top-level column to a table. + * + * Because "." may be interpreted as a field path separator or may be used in field names, it is + * not allowed in names passed to this method. To add to nested types or to add fields with + * names that contain ".", use {@link #addColumn(String, String, DataType)}. + * + * @param name the new top-level column name + * @param dataType the new column's data type + * @return a TableChange for the addition + */ + static TableChange addColumn(String name, DataType dataType) { +return new AddColumn(null, name, dataType); + } + + /** + * Create a TableChange for adding a nested column to a table. + * + * The parent name is used to find the parent struct type where the nested field will be added. + * If the parent name is null, the new column will be added to the root as a top-level column. + * If parent identifies a struct, a new column is added to that struct. If it identifies a list, + * the column is added to the list element struct, and if it identifies a map, the new column is + * added to the map's value struct. + * + * The given name is used to name the new column and names containing "." are not handled + * differently. + * + * @param parent the new field's parent + * @param name the new field name + * @param dataType the new field's data type + * @return a TableChange for the addition + */ + static TableChange addColumn(String parent, String name, DataType dataType) { +return new AddColumn(parent, name, dataType); + } + + /** + * Create a TableChange for renaming a field. + * + * The name is used to find the field to rename. The new name will replace the name of the type. + * For example, renameColumn("a.b.c", "x") should produce column a.b.x. --- End diff -- Are you looking for examples in Javadoc, or an example implementation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r200170480 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/Table.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.catalog; + +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.types.StructType; + +import java.util.List; +import java.util.Map; + +/** + * Represents table metadata from a {@link DataSourceCatalog}. + */ +public interface Table { --- End diff -- This interface is for named tables, not path-based tables. I think that would probably be a different interface because not all sources can support path-based tables. Cassandra and JDBC are good examples. For the table metadata, I think we do need partitions. Iceberg creates partitioned tables and I'd like to start getting the DDL operations working. This is why I proposed this metadata on the SPIP a few months ago. We seem to have lazy consensus around it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r200170491 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/CatalogSupport.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.sources.v2.catalog.DataSourceCatalog; + +/** + * A mix-in interface for {@link DataSourceV2} catalog support. Data sources can implement this + * interface to provide the ability to load, create, alter, and drop tables. + * + * Data sources must implement this interface to support logical operations that combine writing + * data with catalog tasks, like create-table-as-select. + */ +public interface CatalogSupport { --- End diff -- Works for me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21696 Thanks, @wangyum! I think this is refactor was a good idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r199980993 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -19,166 +19,186 @@ package org.apache.spark.sql.execution.datasources.parquet import java.sql.Date +import scala.collection.JavaConverters._ + import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.PrimitiveComparator +import org.apache.parquet.schema._ +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { + case class ParquetSchemaType( + originalType: OriginalType, + primitiveTypeName: PrimitiveType.PrimitiveTypeName, + decimalMetadata: DecimalMetadata) + private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } - private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +// BooleanType +case ParquetSchemaType(null, BOOLEAN, null) => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +// IntegerType +case ParquetSchemaType(null, INT32, null) => (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer]) -case LongType => +// LongType +case ParquetSchemaType(null, INT64, null) => (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long]) -case FloatType => +// FloatType +case ParquetSchemaType(null, FLOAT, null) => (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float]) -case DoubleType => +// DoubleType +case ParquetSchemaType(null, DOUBLE, null) => (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) +// StringType // Binary.fromString and Binary.fromByteArray don't accept null values -case StringType => +case ParquetSchemaType(UTF8, BINARY, null) => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) -case BinaryType => +// BinaryType +case ParquetSchemaType(null, BINARY, null) => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) -case DateType if pushDownDate => +// DateType +case ParquetSchemaType(DATE, INT32, null) if pushDownDate => (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) } - private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +case ParquetSchemaType(null, BOOLEAN, null) => (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +case ParquetSchemaType(null, INT32, null) => (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer]) -case LongType => +case ParquetSchemaType(null, INT64, null) => (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long]) -case FloatType => +case ParquetSchemaType(null, FLOAT, null) => (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) -case DoubleType => +case ParquetSchemaType(null, DOUBLE, null) => (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r199980897 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -19,166 +19,186 @@ package org.apache.spark.sql.execution.datasources.parquet import java.sql.Date +import scala.collection.JavaConverters._ + import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.PrimitiveComparator +import org.apache.parquet.schema._ +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { + case class ParquetSchemaType( + originalType: OriginalType, + primitiveTypeName: PrimitiveType.PrimitiveTypeName, + decimalMetadata: DecimalMetadata) + private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } - private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +// BooleanType --- End diff -- The other partial functions don't have these comments. Is that on purpose? Maybe these should be constants instead to make the code more readable and consistent? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r199980632 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -19,166 +19,186 @@ package org.apache.spark.sql.execution.datasources.parquet import java.sql.Date +import scala.collection.JavaConverters._ + import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.PrimitiveComparator +import org.apache.parquet.schema._ +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { + case class ParquetSchemaType( + originalType: OriginalType, + primitiveTypeName: PrimitiveType.PrimitiveTypeName, + decimalMetadata: DecimalMetadata) + private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } - private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +// BooleanType +case ParquetSchemaType(null, BOOLEAN, null) => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +// IntegerType +case ParquetSchemaType(null, INT32, null) => --- End diff -- Before, it was a valid assumption that the value's type matched the `DataType`. Now that this is the file's type that might not be the case. For example, byte and short are stored as INT32. This should cast to Number and then convert to the file's type. I would also do this for INT64 columns, in case the schema has evolved and a column that was INT32 is not INT64. The converters (used to materialize records) don't currently support this, but it would be reasonable for them to support it eventually. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r199979463 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -379,14 +366,29 @@ class ParquetFileFormat null) val sharedConf = broadcastedHadoopConf.value.value + + val fileMetaData = +ParquetFileReader.readFooter(sharedConf, fileSplit.getPath, SKIP_ROW_GROUPS).getFileMetaData --- End diff -- We should *always* read the footer and adjust the filters to match. In our version we use the following to handle situations where the column names have changed case or where Hive has returned a different case: ``` // Try to push down filters when filter push-down is enabled. val pushed = if (pushdownEnabled) { // read the file schema to create Parquet filters that match case val fileReader = ParquetFileReader.open(conf, fileSplit.getPath) val fileSchema = try { new ParquetToSparkSchemaConverter(conf).convert( ParquetReadSupport.clipParquetSchema( fileReader.getFileMetaData.getSchema, requiredSchema)) } finally { fileReader.close() } filters // Collects all converted Parquet filter predicates. Notice that not all predicates can // be converted (`ParquetFilters.createFilter` returns an `Option`). That's why a // `flatMap` is used here. .flatMap(ParquetFilters.createFilter(fileSchema, _)) .reduceOption(FilterApi.and) } else { None } ``` That's pretty much the same thing as here. In addition, if columns can be renamed within a Parquet file then you need to push filters for the names used in the file's schema. I don't think that's a problem here because I don't know of a way to rename columns in a Spark Parquet table. (Iceberg uses field IDs to do this.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21682: [SPARK-24706][SQL] ByteType and ShortType support pushdo...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21682 +1 I agree with some of the minor refactoring suggestions, but overall this looks correct to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21682: [SPARK-24706][SQL] ByteType and ShortType support...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21682#discussion_r199977784 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -69,6 +77,14 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { case BooleanType => (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) +case ByteType => --- End diff -- Usually, both byte and short would be stored as integers in Parquet. Because Parquet uses bit packing, it doesn't matter if you store them as ints (or even longs) because they'll get packed into the same space. The important thing is to match the Parquet file's type when pushing a filter. Since Spark stores ByteType and ShortType in Parquet as INT32, this is correct. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21682: [SPARK-24706][SQL] ByteType and ShortType support...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21682#discussion_r199977420 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -42,6 +42,14 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { case BooleanType => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) +case ByteType => + (n: String, v: Any) => FilterApi.eq( +intColumn(n), +Option(v).map(b => b.asInstanceOf[java.lang.Byte].toInt.asInstanceOf[Integer]).orNull) --- End diff -- I agree. Also, there's no need to use `Option.map` because the value cannot be null. That's why the `IntegerType` case just casts the value. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for catalog s...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21306 @cloud-fan, thanks for the thorough feedback! > What catalog operations we want to forward to the data source catalog? Currently it's create/drop/alter table, I think it's good enough for now. This PR introduces create, drop, and alter. We can always add more later. These are the ones that we need to implement DataSourceV2 operations and DDL support. > Spark provides an API so that end-users can do it directly. e.g. `spark.catalog("iceberge").createTable(...)`, or SQL API `CREATE TABLE iceberge.db1.tbl1 . . .` These two are the easiest and least intrusive way to start because the data source catalog interaction is explicitly tied to a catalog. It also matches the behavior used by other systems for multiple catalogs. I think this is what we should start with and then tackle ideas like your second point. > When creating/dropping/altering Spark tables, also forward it to the data source catalog. . . For this and a couple other questions, I don't think we need to decide right now. This PR is about getting the interface for other sources in Spark. We don't necessarily need to know all of the ways that users will call it or interact with it, like how `DESC TABLE` will work. To your question here, I'm not sure whether the `CREATE TABLE ... USING source` syntax should use the default catalog or defer to the catalog for `source` or forward to both, but that doesn't need to block adding this API because I think we can decide it later. In addition, we should probably discuss this on the dev list to make sure we get the behavior right. > How to lookup the table metadata from data source catalog? The SPIP proposes two catalog interfaces that return `Table`. One that uses table identifiers and one that uses paths. Data sources can implement support for both or just one. This PR includes just the support for table identifiers. We would add a similar API for path-based tables in another PR. > How to define table metadata? Maybe we can forward `DESC TABLE` . . . That sounds like a reasonable idea to me. Like the behavior of `USING`, I don't think this is something that we have to decide right now. We can add support later as we implement table DDL. Maybe `Table` should return a DF that is its `DESCRIBE` output. > How does the table metadata involve in data reading/writing? This is another example of something we don't need to decide yet. We have a couple different options for the behavior and will want to think them through and discuss them on the dev list. But I don't think that the behavior necessarily needs to be decided before we add this API to sources. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r198907669 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -359,6 +369,70 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } + test("filter pushdown - decimal") { +Seq(true, false).foreach { legacyFormat => + withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> legacyFormat.toString) { +Seq(s"_1 decimal(${Decimal.MAX_INT_DIGITS}, 2)", // 32BitDecimalType + s"_1 decimal(${Decimal.MAX_LONG_DIGITS}, 2)", // 64BitDecimalType + "_1 decimal(38, 18)" // ByteArrayDecimalType +).foreach { schemaDDL => + val schema = StructType.fromDDL(schemaDDL) + val rdd = +spark.sparkContext.parallelize((1 to 4).map(i => Row(new java.math.BigDecimal(i + val dataFrame = spark.createDataFrame(rdd, schema) + testDecimalPushDown(dataFrame) { implicit df => +assert(df.schema === schema) +checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) +checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_))) + +checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1) +checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1) +checkFilterPredicate('_1 =!= 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_))) + +checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1) +checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4) +checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1) +checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4) + +checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1) +checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1) +checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1) +checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4) +checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1) +checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4) + +checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) +checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) + } +} + } +} + } + + test("incompatible parquet file format will throw exeception") { --- End diff -- If we can detect the case where the data is written with the legacy format, then why do we need a property to read with the legacy format? Why not do the right thing without a property? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r198906232 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -359,6 +369,70 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } + test("filter pushdown - decimal") { +Seq(true, false).foreach { legacyFormat => + withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> legacyFormat.toString) { +Seq(s"_1 decimal(${Decimal.MAX_INT_DIGITS}, 2)", // 32BitDecimalType --- End diff -- Since this is providing a column name, it would be better to use something more readable than _1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r198904779 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -62,6 +98,30 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean) { (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) +case decimal: DecimalType + if pushDownDecimal && (DecimalType.is32BitDecimalType(decimal) && !readLegacyFormat) => + (n: String, v: Any) => FilterApi.eq( +intColumn(n), + Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().intValue() --- End diff -- Does this need to validate the scale of the decimal, or is scale adjusted in the analyzer? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r198904504 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -62,6 +98,30 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean) { (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) +case decimal: DecimalType + if pushDownDecimal && (DecimalType.is32BitDecimalType(decimal) && !readLegacyFormat) => + (n: String, v: Any) => FilterApi.eq( +intColumn(n), + Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().intValue() + .asInstanceOf[Integer]).orNull) +case decimal: DecimalType + if pushDownDecimal && (DecimalType.is64BitDecimalType(decimal) && !readLegacyFormat) => + (n: String, v: Any) => FilterApi.eq( +longColumn(n), + Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().longValue() +.asInstanceOf[java.lang.Long]).orNull) +case decimal: DecimalType + if pushDownDecimal && ((DecimalType.is32BitDecimalType(decimal) && readLegacyFormat) --- End diff -- Please add comments here to explain what differs when `readLegacyFormat` is true. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r198904089 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -378,6 +378,22 @@ object SQLConf { .booleanConf .createWithDefault(true) + val PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED = +buildConf("spark.sql.parquet.filterPushdown.decimal") + .doc(s"If true, enables Parquet filter push-down optimization for Decimal. " + +"The default value is false to compatible with legacy parquet format. " + +s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " + +"enabled and Decimal statistics are generated(Since Spark 2.4).") + .internal() + .booleanConf + .createWithDefault(true) + + val PARQUET_READ_LEGACY_FORMAT = buildConf("spark.sql.parquet.readLegacyFormat") --- End diff -- This property doesn't mention pushdown, but the description says it is only valid for push-down. Can you make the property name more clear? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21623: [SPARK-24638][SQL] StringStartsWith support push down
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21623 Overall, I think this is close. The tests need to cover the row group stats case and we should update how configuration is passed to the filters. Thanks for working on this, @wangyum! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21623: [SPARK-24638][SQL] StringStartsWith support push ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21623#discussion_r198553569 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -22,16 +22,23 @@ import java.sql.Date import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary +import org.apache.parquet.schema.PrimitiveComparator import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ -private[parquet] class ParquetFilters(pushDownDate: Boolean) { +private[parquet] class ParquetFilters() { + + val sqlConf: SQLConf = SQLConf.get --- End diff -- This should pass in `pushDownDate` and `pushDownStartWith` like the previous version did with just the date setting. The SQLConf is already available in ParquetFileFormat and it *would* be better to pass it in. The problem is that this class is instantiated in the function (`(file: PartitionedFile) => { ... }`) that gets serialized and sent to executors. That means we don't want SQLConf and its references in the function's closure. The way we got around this before was to put boolean config vals in the closure instead. I think you should go with that approach. I'm not sure what `SQLConf.get` is for or what a correct use would be. @gatorsmile, can you comment on use of `SQLConf.get`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21623: [SPARK-24638][SQL] StringStartsWith support push ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21623#discussion_r198551889 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -660,6 +661,56 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assert(df.where("col > 0").count() === 2) } } + + test("filter pushdown - StringStartsWith") { +withParquetDataFrame((1 to 4).map(i => Tuple1(i + "str" + i))) { implicit df => --- End diff -- I think that all of these tests go through the `keep` method instead of the `canDrop` and `inverseCanDrop`. I think those methods need to be tested. You can do that by constructing a Parquet file with row groups that have predictable statistics, but that would be difficult. An easier way to do this is to define the predicate class elsewhere and create a unit test for it that passes in different statistics values. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21262: [SPARK-24172][SQL]: Push projection and filters o...
Github user rdblue closed the pull request at: https://github.com/apache/spark/pull/21262 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for catalog s...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21306 @cloud-fan, what needs to change to get this in? I'd like to start making more PRs based on these changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21623: [SPARK-24638][SQL] StringStartsWith support push ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21623#discussion_r198244664 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -270,6 +277,29 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean) { case sources.Not(pred) => createFilter(schema, pred).map(FilterApi.not) + case sources.StringStartsWith(name, prefix) if pushDownStartWith && canMakeFilterOn(name) => +Option(prefix).map { v => + FilterApi.userDefined(binaryColumn(name), +new UserDefinedPredicate[Binary] with Serializable { + private val strToBinary = Binary.fromReusedByteArray(v.getBytes) + private val size = strToBinary.length + + override def canDrop(statistics: Statistics[Binary]): Boolean = { +val comparator = PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR +val max = statistics.getMax +val min = statistics.getMin +comparator.compare(max.slice(0, math.min(size, max.length)), strToBinary) < 0 || + comparator.compare(min.slice(0, math.min(size, min.length)), strToBinary) > 0 + } + + override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false --- End diff -- Sorry, I meant if the min and max both *include* the prefix, then we should be able to drop the range. The situation is where both min and max match, so all values must also match the filter. If we are looking for values that do not match the filter, then we can eliminate the row group. The example is prefix=CCC and values are between min=CCCa and max=CCCZ: all values start with CCC, so the entire row group can be skipped. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21623: [SPARK-24638][SQL] StringStartsWith support push ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21623#discussion_r198230713 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -270,6 +277,29 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean) { case sources.Not(pred) => createFilter(schema, pred).map(FilterApi.not) + case sources.StringStartsWith(name, prefix) if pushDownStartWith && canMakeFilterOn(name) => +Option(prefix).map { v => + FilterApi.userDefined(binaryColumn(name), +new UserDefinedPredicate[Binary] with Serializable { + private val strToBinary = Binary.fromReusedByteArray(v.getBytes) + private val size = strToBinary.length + + override def canDrop(statistics: Statistics[Binary]): Boolean = { +val comparator = PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR +val max = statistics.getMax +val min = statistics.getMin +comparator.compare(max.slice(0, math.min(size, max.length)), strToBinary) < 0 || + comparator.compare(min.slice(0, math.min(size, min.length)), strToBinary) > 0 + } + + override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false --- End diff -- Why can't this evaluate the inverse of `StartsWith`? If the min and max values exclude the prefix, then this should be able to filter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21615: [SPARK-24552][core][sql] Use unique id instead of attemp...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21615 +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21606#discussion_r197552309 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -125,11 +124,11 @@ object DataWritingSparkTask extends Logging { val coordinator = SparkEnv.get.outputCommitCoordinator val commitAuthorized = coordinator.canCommit(stageId, stageAttempt, partId, attemptId) if (commitAuthorized) { - logInfo(s"Writer for stage $stageId / $stageAttempt, " + + logInfo(s"Writer for stage $stageId.$stageAttempt, " + s"task $partId.$attemptId is authorized to commit.") dataWriter.commit() } else { - val message = s"Stage $stageId / $stageAttempt, " + + val message = s"Stage $stageId.$stageAttempt, " + --- End diff -- +1 Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21606: [SPARK-24552][core][SQL] Use task ID instead of attempt ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21606 +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21606#discussion_r197547079 --- Diff: core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala --- @@ -76,13 +76,17 @@ object SparkHadoopWriter extends Logging { // Try to write all RDD partitions as a Hadoop OutputFormat. try { val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { +// SPARK-24552: Generate a unique "attempt ID" based on the stage and task atempt numbers. +// Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently. +val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber --- End diff -- Okay, that makes sense if this is just for Hadoop attempt IDs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21606#discussion_r197543585 --- Diff: core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala --- @@ -104,12 +104,12 @@ object SparkHadoopWriter extends Logging { jobTrackerId: String, commitJobId: Int, sparkPartitionId: Int, - sparkAttemptNumber: Int, + sparkTaskId: Long, committer: FileCommitProtocol, iterator: Iterator[(K, V)]): TaskCommitMessage = { // Set up a task. val taskContext = config.createTaskAttemptContext( - jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber) + jobTrackerId, commitJobId, sparkPartitionId, sparkTaskId.toInt) --- End diff -- To backport this, can we use the `.toInt` version? I think that should be safe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21577: [SPARK-24589][core] Correctly identify tasks in output c...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21577 Thanks for fixing this, @vanzin! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21606#discussion_r197542830 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -125,11 +124,11 @@ object DataWritingSparkTask extends Logging { val coordinator = SparkEnv.get.outputCommitCoordinator val commitAuthorized = coordinator.canCommit(stageId, stageAttempt, partId, attemptId) if (commitAuthorized) { - logInfo(s"Writer for stage $stageId / $stageAttempt, " + + logInfo(s"Writer for stage $stageId.$stageAttempt, " + s"task $partId.$attemptId is authorized to commit.") dataWriter.commit() } else { - val message = s"Stage $stageId / $stageAttempt, " + + val message = s"Stage $stageId.$stageAttempt, " + --- End diff -- (This is for the next line, sorry for the confusion) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21606#discussion_r197542704 --- Diff: core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala --- @@ -104,12 +104,12 @@ object SparkHadoopWriter extends Logging { jobTrackerId: String, commitJobId: Int, sparkPartitionId: Int, - sparkAttemptNumber: Int, + sparkTaskId: Long, committer: FileCommitProtocol, iterator: Iterator[(K, V)]): TaskCommitMessage = { // Set up a task. val taskContext = config.createTaskAttemptContext( - jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber) + jobTrackerId, commitJobId, sparkPartitionId, sparkTaskId.toInt) --- End diff -- I commented before I saw this thread, but I think it is better to use the TID because that is already exposed in the UI so it is better for tracking between UI tasks and logs. The combined attempt number isn't used anywhere so this would introduce another number to identify a task. And anyway, shifting by 16 means that these grow huge anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21606#discussion_r197542014 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java --- @@ -42,15 +42,12 @@ *Usually Spark processes many RDD partitions at the same time, *implementations should use the partition id to distinguish writers for *different partitions. - * @param attemptNumber Spark may launch multiple tasks with the same task id. For example, a task - * failed, Spark launches a new task wth the same task id but different - * attempt number. Or a task is too slow, Spark launches new tasks wth the - * same task id but different attempt number, which means there are multiple - * tasks with the same task id running at the same time. Implementations can - * use this attempt number to distinguish writers of different task attempts. + * @param taskId A unique identifier for a task that is performing the write of the partition + * data. Spark may run multiple tasks for the same partition (due to speculation + * or task failures, for example). * @param epochId A monotonically increasing id for streaming queries that are split in to *discrete periods of execution. For non-streaming queries, *this ID will always be 0. */ - DataWriter createDataWriter(int partitionId, int attemptNumber, long epochId); + DataWriter createDataWriter(int partitionId, int taskId, long epochId); --- End diff -- +1 for the type change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21606#discussion_r197541490 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -125,11 +124,11 @@ object DataWritingSparkTask extends Logging { val coordinator = SparkEnv.get.outputCommitCoordinator val commitAuthorized = coordinator.canCommit(stageId, stageAttempt, partId, attemptId) if (commitAuthorized) { - logInfo(s"Writer for stage $stageId / $stageAttempt, " + + logInfo(s"Writer for stage $stageId.$stageAttempt, " + s"task $partId.$attemptId is authorized to commit.") dataWriter.commit() } else { - val message = s"Stage $stageId / $stageAttempt, " + + val message = s"Stage $stageId.$stageAttempt, " + --- End diff -- Should these logs use TID instead of attempt number? The format used in other log messages is `s"Task $taskId (TID $tid)"`, I think. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21606#discussion_r197540970 --- Diff: core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala --- @@ -76,13 +76,17 @@ object SparkHadoopWriter extends Logging { // Try to write all RDD partitions as a Hadoop OutputFormat. try { val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { +// SPARK-24552: Generate a unique "attempt ID" based on the stage and task atempt numbers. +// Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently. +val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber --- End diff -- I don't think we should generate an ID this way. We already have a unique ID that is exposed in the Spark UI. I'd much rather make it clear that the TID passed to committers as an attempt ID is the same as the TID in the stage view. That makes debugging easier. Going with this approach just introduces yet another number to track an attempt. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21558: [SPARK-24552][SQL] Use task ID instead of attempt number...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21558 @vanzin, thanks for working on this. I was out most of this week at a conference and I'm still on just half time, which is why I was delayed. Sorry to leave you all waiting. I'll make comments on your PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21558: [SPARK-24552][SQL] Use task ID instead of attempt number...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21558 Yes, I just checked and speculative attempts do get a different TID. Just turn on speculation, run a large stage, and sort tasks in a stage by TID. There aren't duplicates. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21558: [SPARK-24552][SQL] Use task ID instead of attempt number...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21558 Retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21574: [SPARK-24478][SQL][followup] Move projection and filter ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21574 +1 (non-binding) assuming that tests pass. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21574: [SPARK-24478][SQL][followup] Move projection and ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21574#discussion_r196223209 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -106,7 +106,7 @@ case class StreamingDataSourceV2Relation( object DataSourceV2Relation { private implicit class SourceHelpers(source: DataSourceV2) { -def asReadSupport: ReadSupport = { +private def asReadSupport: ReadSupport = { --- End diff -- Minor: these are effectively private because `SourceHelpers` is private. If we were to move `SourceHelpers` or make it public to some other part of Spark, we would have to revert these changes. So I think it is best to rely on the visibility of `SourceHelpers` instead of making these private. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21574: [SPARK-24478][SQL][followup] Move projection and ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21574#discussion_r196222500 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -23,17 +23,24 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} -import org.apache.spark.sql.execution.datasources.DataSourceStrategy -import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} -import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsReportStatistics} import org.apache.spark.sql.types.StructType +/** + * A logical plan representing a data source v2 scan. + * + * @param source An instance of a [[DataSourceV2]] implementation. + * @param options The options for this scan. Used to create fresh [[DataSourceReader]]. + * @param userSpecifiedSchema The user-specified schema for this scan. Used to create fresh + *[[DataSourceReader]]. + */ case class DataSourceV2Relation( source: DataSourceV2, output: Seq[AttributeReference], options: Map[String, String], -userSpecifiedSchema: Option[StructType] = None) +userSpecifiedSchema: Option[StructType]) --- End diff -- Either way, it's up to you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21558: [SPARK-24552][SQL] Use task ID instead of attempt number...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21558 @vanzin, the ID that this uses is the TID, which I thought was always unique. It appears to be a one-up counter. Also, I noted on your PR that both are needed because even if we only commit one of the attempts, the writers may use this ID to track and clean up data. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21577: [SPARK-24552][core] Correctly identify tasks in o...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21577#discussion_r196217742 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -399,7 +399,8 @@ private[spark] object JsonProtocol { ("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~ ("Accumulator Updates" -> accumUpdates) case taskCommitDenied: TaskCommitDenied => -("Job ID" -> taskCommitDenied.jobID) ~ +("Job ID" -> taskCommitDenied.stageID) ~ +("Job Attempt Number" -> taskCommitDenied.stageAttempt) ~ --- End diff -- Also, will this affect the compatibility of the history server files? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21577: [SPARK-24552][core] Correctly identify tasks in output c...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21577 +1. This fixes the commit coordinator problem where two separate tasks can be authorized. That case could lead to duplicate data (if, for example, both tasks generated unique file names using a random UUID). However, this doesn't address the problem I hit in practice, where a file was created twice and deleted once because the same task attempt number was both allowed to commit by the coordinator and denied commit by the coordinator (after the stage had finished). We still need the solution proposed in https://github.com/apache/spark/pull/21558 for the v2 API. But that's more of a v2 API problem because that API makes the guarantee that implementations can rely on the attempt ID. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21577: [SPARK-24552][core] Correctly identify tasks in o...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21577#discussion_r196215961 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -399,7 +399,8 @@ private[spark] object JsonProtocol { ("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~ ("Accumulator Updates" -> accumUpdates) case taskCommitDenied: TaskCommitDenied => -("Job ID" -> taskCommitDenied.jobID) ~ +("Job ID" -> taskCommitDenied.stageID) ~ +("Job Attempt Number" -> taskCommitDenied.stageAttempt) ~ --- End diff -- Why does this use "Job" and not "Stage"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21577: [SPARK-24552][core] Correctly identify tasks in o...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21577#discussion_r196214944 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -131,16 +139,17 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) reason match { case Success => // The task output has been committed successfully - case denied: TaskCommitDenied => -logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " + - s"attempt: $attemptNumber") - case otherReason => + case _: TaskCommitDenied => +logInfo(s"Task was denied committing, stage: $stage / $stageAttempt, " + --- End diff -- Nit: Should this be `s"$stage.$stageAttempt"`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21577: [SPARK-24552][core] Correctly identify tasks in o...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21577#discussion_r196214788 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -131,16 +139,17 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) reason match { case Success => // The task output has been committed successfully - case denied: TaskCommitDenied => -logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " + - s"attempt: $attemptNumber") - case otherReason => + case _: TaskCommitDenied => +logInfo(s"Task was denied committing, stage: $stage / $stageAttempt, " + + s"partition: $partition, attempt: $attemptNumber") + case _ => // Mark the attempt as failed to blacklist from future commit protocol -stageState.failures.getOrElseUpdate(partition, mutable.Set()) += attemptNumber -if (stageState.authorizedCommitters(partition) == attemptNumber) { +val taskId = TaskIdentifier(stageAttempt, attemptNumber) +stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId +if (stageState.authorizedCommitters(partition) == taskId) { logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " + s"partition=$partition) failed; clearing lock") - stageState.authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER + stageState.authorizedCommitters(partition) = null --- End diff -- Nit: why not use Option[TaskIdentifier] and None here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21558: [SPARK-24552][SQL] Use task ID instead of attempt number...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21558 I think the right thing to do for this commit is to use the task ID instead of the stage-local attempt number. I've updated the PR with the change so I think this is ready to commit. @vanzin, are you okay committing this? cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21558: [SPARK-24552][SQL] Use task ID instead of attempt number...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21558 @tgravescs, that's exactly what we're seeing. I think it might just be misleading to have a stage-local attempt ID although it is more friendly for users and matches what MR does. @jiangxb1987, we see SPARK-24492 occasionally (it has gotten better with later fixes to the coordinator) and haven't tracked down the cause yet. If this were the underlying cause that would be great, but I'm not sure how it could be the cause. If the same attempt number is reused, then two tasks in different stage attempts may both be authorized to commit. That wouldn't cause the retries because it wouldn't cause extra commit denials. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21574: [SPARK-24478][SQL][followup] Move projection and ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21574#discussion_r196192774 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -23,17 +23,24 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} -import org.apache.spark.sql.execution.datasources.DataSourceStrategy -import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} -import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsReportStatistics} import org.apache.spark.sql.types.StructType +/** + * A logical plan representing a data source v2 scan. + * + * @param source An instance of a [[DataSourceV2]] implementation. + * @param options The options for this scan. Used to create fresh [[DataSourceReader]]. + * @param userSpecifiedSchema The user-specified schema for this scan. Used to create fresh + *[[DataSourceReader]]. + */ case class DataSourceV2Relation( source: DataSourceV2, output: Seq[AttributeReference], options: Map[String, String], -userSpecifiedSchema: Option[StructType] = None) +userSpecifiedSchema: Option[StructType]) --- End diff -- That's because there are few places that create v2 relations so far, but when SQL statements and other paths that don't allow you to supply your own schema are added, I think this will be more common. It's okay to remove it, but I don't see much value in the change and I like to keep non-functional changes to a minimum. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21503 Thank you for reviewing this, @cloud-fan! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21574: [SPARK-24478][SQL][followup] Move projection and ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21574#discussion_r196173414 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -23,17 +23,24 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} -import org.apache.spark.sql.execution.datasources.DataSourceStrategy -import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} -import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsReportStatistics} import org.apache.spark.sql.types.StructType +/** + * A logical plan representing a data source v2 scan. + * + * @param source An instance of a [[DataSourceV2]] implementation. + * @param options The options for this scan. Used to create fresh [[DataSourceReader]]. + * @param userSpecifiedSchema The user-specified schema for this scan. Used to create fresh + *[[DataSourceReader]]. + */ case class DataSourceV2Relation( source: DataSourceV2, output: Seq[AttributeReference], options: Map[String, String], -userSpecifiedSchema: Option[StructType] = None) +userSpecifiedSchema: Option[StructType]) --- End diff -- Why is this change necessary? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org