[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202093665
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   def createFilter(schema: MessageType, predicate: sources.Filter): 
Option[FilterPredicate] = {
 val nameToType = getFieldMap(schema)
 
+def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): 
Boolean = value match {
+  case decimal: JBigDecimal =>
+decimal.scale == decimalMeta.getScale
+  case _ => false
+}
+
+// Since SPARK-24716, ParquetFilter accepts parquet file schema to 
convert to
+// data source Filter. This must make sure that filter value matched 
the Filter.
+// If doesn't matched, then the schema used to read the file is 
incorrect,
+// which would cause data corruption.
+def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
+  value == null || (nameToType(name) match {
+case ParquetBooleanType => value.isInstanceOf[JBoolean]
+case ParquetByteType | ParquetShortType | ParquetIntegerType => 
value.isInstanceOf[Number]
+case ParquetLongType => value.isInstanceOf[JLong]
+case ParquetFloatType => value.isInstanceOf[JFloat]
+case ParquetDoubleType => value.isInstanceOf[JDouble]
+case ParquetStringType => value.isInstanceOf[String]
+case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
+case ParquetDateType => value.isInstanceOf[Date]
--- End diff --

Not in this PR that adds Decimal support. We should consider it in the 
future, though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202093508
  
--- Diff: sql/core/benchmarks/FilterPushdownBenchmark-results.txt ---
@@ -292,120 +292,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 Select 1 decimal(9, 2) row (value = 7864320): Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
 

-Parquet Vectorized3785 / 3867  4.2 
240.6   1.0X
-Parquet Vectorized (Pushdown) 3820 / 3928  4.1 
242.9   1.0X
-Native ORC Vectorized 3981 / 4049  4.0 
253.1   1.0X
-Native ORC Vectorized (Pushdown)   702 /  735 22.4 
 44.6   5.4X
+Parquet Vectorized4407 / 4852  3.6 
280.2   1.0X
+Parquet Vectorized (Pushdown) 1602 / 1634  9.8 
101.8   2.8X
--- End diff --

I'm not sure I understand. That's less than 2^24, so it should fit in an 
int. It should also fit in 8 base-ten digits so decimal(9,2) should work. And 
last, if the values don't fit in an int, I'm not sure how we would be able to 
store them in the first place, regardless of how stats are handled.

Did you verify that there are no stats for the file produced here? If 
that's the case, it would make sense with these numbers. I think we just need 
to look for a different reason why stats are missing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r202090024
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -248,29 +371,29 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
 // Probably I missed something and obviously this should be changed.
 
 predicate match {
-  case sources.IsNull(name) if canMakeFilterOn(name) =>
+  case sources.IsNull(name) if canMakeFilterOn(name, null) =>
 makeEq.lift(nameToType(name)).map(_(name, null))
-  case sources.IsNotNull(name) if canMakeFilterOn(name) =>
+  case sources.IsNotNull(name) if canMakeFilterOn(name, null) =>
 makeNotEq.lift(nameToType(name)).map(_(name, null))
 
-  case sources.EqualTo(name, value) if canMakeFilterOn(name) =>
+  case sources.EqualTo(name, value) if canMakeFilterOn(name, value) =>
 makeEq.lift(nameToType(name)).map(_(name, value))
-  case sources.Not(sources.EqualTo(name, value)) if 
canMakeFilterOn(name) =>
+  case sources.Not(sources.EqualTo(name, value)) if 
canMakeFilterOn(name, value) =>
 makeNotEq.lift(nameToType(name)).map(_(name, value))
 
-  case sources.EqualNullSafe(name, value) if canMakeFilterOn(name) =>
+  case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, 
value) =>
 makeEq.lift(nameToType(name)).map(_(name, value))
-  case sources.Not(sources.EqualNullSafe(name, value)) if 
canMakeFilterOn(name) =>
+  case sources.Not(sources.EqualNullSafe(name, value)) if 
canMakeFilterOn(name, value) =>
 makeNotEq.lift(nameToType(name)).map(_(name, value))
--- End diff --

Maybe I'm missing something, but that returns true for all null values.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...

2018-07-11 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21118
  
@cloud-fan, I'd like to get this PR in by 2.4.0. Now that the change to 
push predicates and projections happens when converting to the physical plan, 
this can go in. I've rebased this on master and updated it.

This changes the DSv2 API to primarily use InternalRow. It ensures that the 
rows are UnsafeRow by adding a projection on top of the physical scan node. 
This projection is actually *more* efficient than the current read path because 
filters are run before the projection. This means, for example, that the 
Parquet reader can avoid those two projections that currently happen in the 
scan node.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201763831
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -248,29 +371,29 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
 // Probably I missed something and obviously this should be changed.
 
 predicate match {
-  case sources.IsNull(name) if canMakeFilterOn(name) =>
+  case sources.IsNull(name) if canMakeFilterOn(name, null) =>
 makeEq.lift(nameToType(name)).map(_(name, null))
-  case sources.IsNotNull(name) if canMakeFilterOn(name) =>
+  case sources.IsNotNull(name) if canMakeFilterOn(name, null) =>
 makeNotEq.lift(nameToType(name)).map(_(name, null))
 
-  case sources.EqualTo(name, value) if canMakeFilterOn(name) =>
+  case sources.EqualTo(name, value) if canMakeFilterOn(name, value) =>
 makeEq.lift(nameToType(name)).map(_(name, value))
-  case sources.Not(sources.EqualTo(name, value)) if 
canMakeFilterOn(name) =>
+  case sources.Not(sources.EqualTo(name, value)) if 
canMakeFilterOn(name, value) =>
 makeNotEq.lift(nameToType(name)).map(_(name, value))
 
-  case sources.EqualNullSafe(name, value) if canMakeFilterOn(name) =>
+  case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, 
value) =>
 makeEq.lift(nameToType(name)).map(_(name, value))
-  case sources.Not(sources.EqualNullSafe(name, value)) if 
canMakeFilterOn(name) =>
+  case sources.Not(sources.EqualNullSafe(name, value)) if 
canMakeFilterOn(name, value) =>
 makeNotEq.lift(nameToType(name)).map(_(name, value))
--- End diff --

Since makeNotEq is also used for EqualNullSafe, I think it should handle 
null values as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201763489
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -37,41 +39,64 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
-private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
+private[parquet] class ParquetFilters(
+pushDownDate: Boolean,
+pushDownDecimal: Boolean,
+pushDownStartWith: Boolean) {
 
   private case class ParquetSchemaType(
   originalType: OriginalType,
   primitiveTypeName: PrimitiveTypeName,
-  decimalMetadata: DecimalMetadata)
-
-  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
-  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, null)
-  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, null)
-  private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
-  private val ParquetLongType = ParquetSchemaType(null, INT64, null)
-  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
-  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
-  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
-  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
-  private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
+  length: Int,
+  decimalMeta: DecimalMetadata)
+
+  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0, 
null)
+  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null)
+  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null)
+  private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null)
+  private val ParquetLongType = ParquetSchemaType(null, INT64, 0, null)
+  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0, null)
+  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0, null)
+  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, 0, null)
+  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0, null)
+  private val ParquetDateType = ParquetSchemaType(DATE, INT32, 0, null)
 
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
+  private def decimalToInt32(decimal: JBigDecimal): Integer = 
decimal.unscaledValue().intValue()
+
+  private def decimalToInt64(decimal: JBigDecimal): JLong = 
decimal.unscaledValue().longValue()
+
+  private def decimalToByteArray(decimal: JBigDecimal, numBytes: Int): 
Binary = {
+val decimalBuffer = new Array[Byte](numBytes)
+val bytes = decimal.unscaledValue().toByteArray
+
+val fixedLengthBytes = if (bytes.length == numBytes) {
+  bytes
+} else {
+  val signByte = if (bytes.head < 0) -1: Byte else 0: Byte
+  java.util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, 
signByte)
+  System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, 
bytes.length)
+  decimalBuffer
+}
+Binary.fromReusedByteArray(fixedLengthBytes, 0, numBytes)
+  }
+
   private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
--- End diff --

Since `makeEq` is called for `EqualsNullSafe` and `valueCanMakeFilterOn` 
allows null values through, I think these could be null, like the String case. 
I think this should use the `Option` pattern from String for all values, unless 
I'm missing some reason why these will never be null.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201761849
  
--- Diff: sql/core/benchmarks/FilterPushdownBenchmark-results.txt ---
@@ -292,120 +292,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 
 Select 1 decimal(9, 2) row (value = 7864320): Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
 

-Parquet Vectorized3785 / 3867  4.2 
240.6   1.0X
-Parquet Vectorized (Pushdown) 3820 / 3928  4.1 
242.9   1.0X
-Native ORC Vectorized 3981 / 4049  4.0 
253.1   1.0X
-Native ORC Vectorized (Pushdown)   702 /  735 22.4 
 44.6   5.4X
+Parquet Vectorized4407 / 4852  3.6 
280.2   1.0X
+Parquet Vectorized (Pushdown) 1602 / 1634  9.8 
101.8   2.8X
--- End diff --

Maybe it is that the data is more dense, so we need to read more values in 
the row group that contains the one we're looking for?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-11 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21305
  
I don't think we need (or want) `SaveMode` passed to writers after 
standardizing. Uses of `WriteSupport` will always append data to an existing 
table, which makes it simpler for writers. And it will be used for all writes.

A couple other notes:
* We will also need a `StagedTable` variant of `DeleteSupport` to support 
`ReplaceData` as an atomic operation, but I want to get the non-atomic variants 
in first; hopefully for 2.4.0.
* RTAS would use `dropTable` and not `DeleteSupport`, since it is at the 
table level.
* We may still use `SaveMode` in the DF writer API, which is still under 
discussion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201756667
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -37,41 +39,64 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
-private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
+private[parquet] class ParquetFilters(
+pushDownDate: Boolean,
+pushDownDecimal: Boolean,
+pushDownStartWith: Boolean) {
 
   private case class ParquetSchemaType(
   originalType: OriginalType,
   primitiveTypeName: PrimitiveTypeName,
-  decimalMetadata: DecimalMetadata)
-
-  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
-  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, null)
-  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, null)
-  private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
-  private val ParquetLongType = ParquetSchemaType(null, INT64, null)
-  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
-  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
-  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
-  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
-  private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
+  length: Int,
+  decimalMeta: DecimalMetadata)
+
+  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0, 
null)
+  private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null)
+  private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null)
+  private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null)
+  private val ParquetLongType = ParquetSchemaType(null, INT64, 0, null)
+  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0, null)
+  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0, null)
+  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, 0, null)
+  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0, null)
+  private val ParquetDateType = ParquetSchemaType(DATE, INT32, 0, null)
 
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
+  private def decimalToInt32(decimal: JBigDecimal): Integer = 
decimal.unscaledValue().intValue()
+
+  private def decimalToInt64(decimal: JBigDecimal): JLong = 
decimal.unscaledValue().longValue()
+
+  private def decimalToByteArray(decimal: JBigDecimal, numBytes: Int): 
Binary = {
+val decimalBuffer = new Array[Byte](numBytes)
+val bytes = decimal.unscaledValue().toByteArray
+
+val fixedLengthBytes = if (bytes.length == numBytes) {
+  bytes
+} else {
+  val signByte = if (bytes.head < 0) -1: Byte else 0: Byte
+  java.util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, 
signByte)
+  System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, 
bytes.length)
+  decimalBuffer
+}
+Binary.fromReusedByteArray(fixedLengthBytes, 0, numBytes)
--- End diff --

This byte array is not reused, it is allocated each time this function 
runs. This should use the `fromConstantByteArray` variant.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201755545
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   def createFilter(schema: MessageType, predicate: sources.Filter): 
Option[FilterPredicate] = {
 val nameToType = getFieldMap(schema)
 
+def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): 
Boolean = value match {
+  case decimal: JBigDecimal =>
+decimal.scale == decimalMeta.getScale
+  case _ => false
+}
+
+// Since SPARK-24716, ParquetFilter accepts parquet file schema to 
convert to
+// data source Filter. This must make sure that filter value matched 
the Filter.
+// If doesn't matched, then the schema used to read the file is 
incorrect,
+// which would cause data corruption.
+def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
+  value == null || (nameToType(name) match {
+case ParquetBooleanType => value.isInstanceOf[JBoolean]
+case ParquetByteType | ParquetShortType | ParquetIntegerType => 
value.isInstanceOf[Number]
+case ParquetLongType => value.isInstanceOf[JLong]
+case ParquetFloatType => value.isInstanceOf[JFloat]
+case ParquetDoubleType => value.isInstanceOf[JDouble]
+case ParquetStringType => value.isInstanceOf[String]
+case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
+case ParquetDateType => value.isInstanceOf[Date]
--- End diff --

Why is there no support for timestamp?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201755353
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   def createFilter(schema: MessageType, predicate: sources.Filter): 
Option[FilterPredicate] = {
 val nameToType = getFieldMap(schema)
 
+def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): 
Boolean = value match {
+  case decimal: JBigDecimal =>
+decimal.scale == decimalMeta.getScale
+  case _ => false
+}
+
+// Since SPARK-24716, ParquetFilter accepts parquet file schema to 
convert to
+// data source Filter. This must make sure that filter value matched 
the Filter.
+// If doesn't matched, then the schema used to read the file is 
incorrect,
+// which would cause data corruption.
+def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
+  value == null || (nameToType(name) match {
+case ParquetBooleanType => value.isInstanceOf[JBoolean]
+case ParquetByteType | ParquetShortType | ParquetIntegerType => 
value.isInstanceOf[Number]
+case ParquetLongType => value.isInstanceOf[JLong]
+case ParquetFloatType => value.isInstanceOf[JFloat]
+case ParquetDoubleType => value.isInstanceOf[JDouble]
+case ParquetStringType => value.isInstanceOf[String]
+case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
+case ParquetDateType => value.isInstanceOf[Date]
+case ParquetSchemaType(DECIMAL, INT32, 0, decimalMeta) =>
--- End diff --

Can the decimal cases be collapsed to a single case on 
`ParquetSchemaType(DECIMAL, _, _, decimalMetadata)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201754882
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   def createFilter(schema: MessageType, predicate: sources.Filter): 
Option[FilterPredicate] = {
 val nameToType = getFieldMap(schema)
 
+def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): 
Boolean = value match {
+  case decimal: JBigDecimal =>
+decimal.scale == decimalMeta.getScale
+  case _ => false
+}
+
+// Since SPARK-24716, ParquetFilter accepts parquet file schema to 
convert to
--- End diff --

Is this issue reference correct? The PR says this is for SPARK-24549.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r201754998
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -202,6 +283,16 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
 case ParquetDateType if pushDownDate =>
   (n: String, v: Any) =>
 FilterApi.gtEq(intColumn(n), 
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
+
+case ParquetSchemaType(DECIMAL, INT32, 0, _) if pushDownDecimal =>
--- End diff --

Why match 0 instead of _?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-11 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21305
  
@cloud-fan, yes. There is an open PR, #21308, that adds `DeleteSupport`. 
I'm not pushing for that just yet because I think `DeleteSupport` should be 
applied to `Table` after #21306 makes it in.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20933: [SPARK-23817][SQL]Migrate ORC file format read pa...

2018-07-10 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20933#discussion_r201427780
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -241,39 +240,47 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 val cls = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
 if (classOf[DataSourceV2].isAssignableFrom(cls)) {
   val ds = cls.newInstance()
-  ds match {
-case ws: WriteSupport =>
-  val options = new DataSourceOptions((extraOptions ++
-DataSourceV2Utils.extractSessionConfigs(
-  ds = ds.asInstanceOf[DataSourceV2],
-  conf = df.sparkSession.sessionState.conf)).asJava)
-  // Using a timestamp and a random UUID to distinguish different 
writing jobs. This is good
-  // enough as there won't be tons of writing jobs created at the 
same second.
-  val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US)
-.format(new Date()) + "-" + UUID.randomUUID()
-  val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, 
options)
-  if (writer.isPresent) {
-runCommand(df.sparkSession, "save") {
-  WriteToDataSourceV2(writer.get(), df.logicalPlan)
-}
-  }
+  val (needToFallBackFileDataSourceV2, fallBackFileFormat) = ds match {
+case f: FileDataSourceV2 =>
+  val disabledV2Readers =
+
df.sparkSession.sessionState.conf.disabledV2FileDataSourceWriter.split(",")
+  (disabledV2Readers.contains(f.shortName), 
f.fallBackFileFormat.getCanonicalName)
+case _ => (false, source)
+  }
 
-// Streaming also uses the data source V2 API. So it may be that 
the data source implements
-// v2, but has no v2 implementation for batch writes. In that 
case, we fall back to saving
-// as though it's a V1 source.
-case _ => saveToV1Source()
+  if (ds.isInstanceOf[WriteSupport] && 
!needToFallBackFileDataSourceV2) {
+val options = new DataSourceOptions((extraOptions ++
+  DataSourceV2Utils.extractSessionConfigs(
+ds = ds.asInstanceOf[DataSourceV2],
+conf = df.sparkSession.sessionState.conf)).asJava)
+// Using a timestamp and a random UUID to distinguish different 
writing jobs. This is good
+// enough as there won't be tons of writing jobs created at the 
same second.
+val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US)
+  .format(new Date()) + "-" + UUID.randomUUID()
+val writer = ds.asInstanceOf[WriteSupport]
+  .createWriter(jobId, df.logicalPlan.schema, mode, options)
--- End diff --

It is. We're still evolving the v2 API and integration with Spark. This 
problem is addressed in PR #21305, which is the first of a series of changes to 
standardize the logical plans and fix problems like this one.

There's also an [open proposal for those 
changes](https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-10 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r201399506
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -240,21 +238,27 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
 val cls = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
 if (classOf[DataSourceV2].isAssignableFrom(cls)) {
-  val ds = cls.newInstance()
-  ds match {
+  val source = cls.newInstance().asInstanceOf[DataSourceV2]
+  source match {
 case ws: WriteSupport =>
-  val options = new DataSourceOptions((extraOptions ++
-DataSourceV2Utils.extractSessionConfigs(
-  ds = ds.asInstanceOf[DataSourceV2],
-  conf = df.sparkSession.sessionState.conf)).asJava)
-  // Using a timestamp and a random UUID to distinguish different 
writing jobs. This is good
-  // enough as there won't be tons of writing jobs created at the 
same second.
-  val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US)
-.format(new Date()) + "-" + UUID.randomUUID()
-  val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, 
options)
-  if (writer.isPresent) {
+  val options = extraOptions ++
+  DataSourceV2Utils.extractSessionConfigs(source, 
df.sparkSession.sessionState.conf)
+
+  val relation = DataSourceV2Relation.create(source, options.toMap)
+  if (mode == SaveMode.Append) {
 runCommand(df.sparkSession, "save") {
-  WriteToDataSourceV2(writer.get(), df.logicalPlan)
+  AppendData.byName(relation, df.logicalPlan)
+}
+
+  } else {
+val writer = ws.createWriter(
+  UUID.randomUUID.toString, 
df.logicalPlan.output.toStructType, mode,
--- End diff --

I see no good reason to over-complicate the unique string passed in. Here's 
a quote from wikipedia on the chance of a conflict (from [this SO 
answer](https://stackoverflow.com/questions/24876188/how-big-is-the-chance-to-get-a-java-uuid-randomuuid-collision)):

```
Only after generating 1 billion UUIDs every second for the next 100 years, 
the probability of creating just one duplicate would be about 50%. Or, to put 
it another way, the probability of one duplicate would be about 50% if every 
person on earth owned 600 million UUIDs.
```

Adding timestamp to a UUID to avoid collisions is unnecessary.

For the other use, why would a user go to the temp directory of some node's 
file system -- which may not even be used by a given source -- instead of going 
to the logs? What if the user wants any other piece of information besides the 
starting timestamp (that's in some format that has to be converted)?

In short, I don't agree with the argument that it is helpful to pass the 
old format. This is just a carry-over from making fake Hadoop job IDs (why it 
was called `jobId` and started with `job_`). It's debatable whether the write 
UUID itself is even useful given that there is no requirement to use it 
anywhere.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-09 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21305
  
@cloud-fan, I think we can ignore that last test failure because tests are 
passing on the last commit that made real changes. The latest commit only 
changed a comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-09 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21305
  
Retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-09 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21305
  
@cloud-fan, I've updated this and the tests are passing, so I think it is 
ready for another look.

I just pushed a comments-only commit to fix the Javadoc for AppendData that 
@viirya pointed out (thanks!). Since that's only comments, it shouldn't affect 
test results.

I think there's just one more point under discussion, which is the change 
from `jobId` (which isn't one) to `jobUUID` and dropping the timestamp. I don't 
think a timestamp helps avoid conflicts because it is astronomically unlikely 
that UUIDs will collide.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r200825129
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -240,21 +238,27 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
 val cls = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
 if (classOf[DataSourceV2].isAssignableFrom(cls)) {
-  val ds = cls.newInstance()
-  ds match {
+  val source = cls.newInstance().asInstanceOf[DataSourceV2]
+  source match {
 case ws: WriteSupport =>
-  val options = new DataSourceOptions((extraOptions ++
-DataSourceV2Utils.extractSessionConfigs(
-  ds = ds.asInstanceOf[DataSourceV2],
-  conf = df.sparkSession.sessionState.conf)).asJava)
-  // Using a timestamp and a random UUID to distinguish different 
writing jobs. This is good
-  // enough as there won't be tons of writing jobs created at the 
same second.
-  val jobId = new SimpleDateFormat("MMddHHmmss", Locale.US)
-.format(new Date()) + "-" + UUID.randomUUID()
-  val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, 
options)
-  if (writer.isPresent) {
+  val options = extraOptions ++
+  DataSourceV2Utils.extractSessionConfigs(source, 
df.sparkSession.sessionState.conf)
+
+  val relation = DataSourceV2Relation.create(source, options.toMap)
+  if (mode == SaveMode.Append) {
 runCommand(df.sparkSession, "save") {
-  WriteToDataSourceV2(writer.get(), df.logicalPlan)
+  AppendData.byName(relation, df.logicalPlan)
+}
+
+  } else {
+val writer = ws.createWriter(
+  UUID.randomUUID.toString, 
df.logicalPlan.output.toStructType, mode,
--- End diff --

How would random UUIDs conflict?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r200824906
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java ---
@@ -38,15 +38,16 @@
* If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
* submitted.
*
-   * @param jobId A unique string for the writing job. It's possible that 
there are many writing
-   *  jobs running at the same time, and the returned {@link 
DataSourceWriter} can
-   *  use this job id to distinguish itself from other jobs.
+   * @param writeUUID A unique string for the writing job. It's possible 
that there are many writing
--- End diff --

This is not the ID of the Spark job that is writing. I think the UUID name 
is more clear about what is actually passed, a unique string that identifies 
the write. There's also no need to make the string more complicated than a UUID 
since there are no guarantees about it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r200824639
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2120,6 +2122,99 @@ class Analyzer(
 }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
+   *
+   * - Reorder columns when the write is by name
+   * - Insert safe casts when data types do not match
+   * - Insert aliases when column names do not match
+   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
+   */
+  object ResolveOutputRelation extends Rule[LogicalPlan] {
+override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case append @ AppendData(table: NamedRelation, query, isByName)
+  if table.resolved && query.resolved && !append.resolved =>
+val projection = resolveOutputColumns(table.name, table.output, 
query, isByName)
+
+if (projection != query) {
+  append.copy(query = projection)
+} else {
+  append
+}
+}
+
+def resolveOutputColumns(
+tableName: String,
+expected: Seq[Attribute],
+query: LogicalPlan,
+byName: Boolean): LogicalPlan = {
+
+  if (expected.size < query.output.size) {
+throw new AnalysisException(
+  s"""Cannot write to '$tableName', too many data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+  }
+
+  val errors = new mutable.ArrayBuffer[String]()
+  val resolved: Seq[NamedExpression] = if (byName) {
+expected.flatMap { outAttr =>
+  query.resolveQuoted(outAttr.name, resolver) match {
+case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
+  errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
+  None
+
+case Some(inAttr) if 
!outAttr.dataType.sameType(inAttr.dataType) =>
+  Some(upcast(inAttr, outAttr))
+
+case Some(inAttr) =>
+  Some(inAttr) // matches nullability, datatype, and name
+
+case _ =>
+  errors += s"Cannot find data for output column 
'${outAttr.name}'"
+  None
+  }
+}
+
+  } else {
+if (expected.size > query.output.size) {
--- End diff --

That check is the other direction: not enough columns.

When matching by position, we need to have the same number of columns so we 
add this check (we already know that there aren't too few columns, so this 
checks for too many). When matching by name, we can call out specific columns 
that are missing, which is why we do the validation differently for the two 
cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r200824599
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2120,6 +2122,99 @@ class Analyzer(
 }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
+   *
+   * - Reorder columns when the write is by name
+   * - Insert safe casts when data types do not match
+   * - Insert aliases when column names do not match
+   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
+   */
+  object ResolveOutputRelation extends Rule[LogicalPlan] {
+override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case append @ AppendData(table: NamedRelation, query, isByName)
+  if table.resolved && query.resolved && !append.resolved =>
+val projection = resolveOutputColumns(table.name, table.output, 
query, isByName)
+
+if (projection != query) {
+  append.copy(query = projection)
+} else {
+  append
+}
+}
+
+def resolveOutputColumns(
+tableName: String,
+expected: Seq[Attribute],
+query: LogicalPlan,
+byName: Boolean): LogicalPlan = {
+
+  if (expected.size < query.output.size) {
+throw new AnalysisException(
+  s"""Cannot write to '$tableName', too many data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+  }
+
+  val errors = new mutable.ArrayBuffer[String]()
+  val resolved: Seq[NamedExpression] = if (byName) {
+expected.flatMap { outAttr =>
+  query.resolveQuoted(outAttr.name, resolver) match {
+case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
+  errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
--- End diff --

I would much rather have a job fail fast and give a clear error message 
than to fail during a write. I can see how adding such an assertion to the plan 
could be useful, so I'd consider it if someone wanted to add that feature 
later. Right now, though, I think this is good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r200824602
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2120,6 +2122,99 @@ class Analyzer(
 }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
+   *
+   * - Reorder columns when the write is by name
+   * - Insert safe casts when data types do not match
+   * - Insert aliases when column names do not match
+   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
+   */
+  object ResolveOutputRelation extends Rule[LogicalPlan] {
+override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case append @ AppendData(table: NamedRelation, query, isByName)
+  if table.resolved && query.resolved && !append.resolved =>
+val projection = resolveOutputColumns(table.name, table.output, 
query, isByName)
+
+if (projection != query) {
+  append.copy(query = projection)
+} else {
+  append
+}
+}
+
+def resolveOutputColumns(
+tableName: String,
+expected: Seq[Attribute],
+query: LogicalPlan,
+byName: Boolean): LogicalPlan = {
+
+  if (expected.size < query.output.size) {
+throw new AnalysisException(
+  s"""Cannot write to '$tableName', too many data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+  }
+
+  val errors = new mutable.ArrayBuffer[String]()
+  val resolved: Seq[NamedExpression] = if (byName) {
+expected.flatMap { outAttr =>
+  query.resolveQuoted(outAttr.name, resolver) match {
+case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
+  errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
+  None
+
+case Some(inAttr) if 
!outAttr.dataType.sameType(inAttr.dataType) =>
--- End diff --

Yes, I'll update to check nested fields.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r200824532
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2120,6 +2122,99 @@ class Analyzer(
 }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
+   *
+   * - Reorder columns when the write is by name
+   * - Insert safe casts when data types do not match
+   * - Insert aliases when column names do not match
+   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
+   */
+  object ResolveOutputRelation extends Rule[LogicalPlan] {
+override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case append @ AppendData(table: NamedRelation, query, isByName)
--- End diff --

Yes, I agree.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...

2018-07-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r200824504
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableCatalog.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.catalog;
+
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface TableCatalog {
+  /**
+   * Load table metadata by {@link TableIdentifier identifier} from the 
catalog.
+   *
+   * @param ident a table identifier
+   * @return the table's metadata
+   * @throws NoSuchTableException If the table doesn't exist.
+   */
+  Table loadTable(TableIdentifier ident) throws NoSuchTableException;
+
+  /**
+   * Create a table in the catalog.
+   *
+   * @param ident a table identifier
+   * @param schema the schema of the new table, as a struct type
+   * @return metadata for the new table
+   * @throws TableAlreadyExistsException If a table already exists for the 
identifier
+   */
+  default Table createTable(TableIdentifier ident,
+StructType schema) throws 
TableAlreadyExistsException {
+return createTable(ident, schema, Collections.emptyList(), 
Collections.emptyMap());
+  }
+
+  /**
+   * Create a table in the catalog.
+   *
+   * @param ident a table identifier
+   * @param schema the schema of the new table, as a struct type
+   * @param properties a string map of table properties
+   * @return metadata for the new table
+   * @throws TableAlreadyExistsException If a table already exists for the 
identifier
+   */
+  default Table createTable(TableIdentifier ident,
+StructType schema,
+Map properties) throws 
TableAlreadyExistsException {
+return createTable(ident, schema, Collections.emptyList(), properties);
+  }
+
+  /**
+   * Create a table in the catalog.
+   *
+   * @param ident a table identifier
+   * @param schema the schema of the new table, as a struct type
+   * @param partitions a list of expressions to use for partitioning data 
in the table
+   * @param properties a string map of table properties
+   * @return metadata for the new table
+   * @throws TableAlreadyExistsException If a table already exists for the 
identifier
+   */
+  Table createTable(TableIdentifier ident,
+StructType schema,
+List partitions,
--- End diff --

I wouldn't say this way of passing partitioning is a new feature. It's just 
a generalization of the existing partitioning that allows us to pass any type 
of partition, whether it is bucketing or column-based.

As for open discussion, this was proposed in the SPIP that was fairly 
widely read and commented on. That SPIP was posted to the dev list a few times, 
too. I do appreciate you wanting to make sure there's been a chance for the 
community to discuss it, but there has been plenty of opportunity to comment. 
At this point, I think it's reasonable to move forward with the implementation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-06 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21305
  
@cloud-fan, I've updated this with the requested changes. Thanks for 
looking at it!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r200711421
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -40,17 +44,24 @@ case class DataSourceV2Relation(
 source: DataSourceV2,
 output: Seq[AttributeReference],
 options: Map[String, String],
-userSpecifiedSchema: Option[StructType])
-  extends LeafNode with MultiInstanceRelation with 
DataSourceV2StringFormat {
+tableIdent: Option[TableIdentifier] = None,
+userSpecifiedSchema: Option[StructType] = None)
+  extends LeafNode with MultiInstanceRelation with NamedRelation with 
DataSourceV2StringFormat {
 
   import DataSourceV2Relation._
 
+  override def name: String = {
+tableIdent.map(_.unquotedString).getOrElse("unknown")
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...

2018-07-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r200710273
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableCatalog.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.catalog;
+
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface TableCatalog {
+  /**
+   * Load table metadata by {@link TableIdentifier identifier} from the 
catalog.
+   *
+   * @param ident a table identifier
+   * @return the table's metadata
+   * @throws NoSuchTableException If the table doesn't exist.
+   */
+  Table loadTable(TableIdentifier ident) throws NoSuchTableException;
+
+  /**
+   * Create a table in the catalog.
+   *
+   * @param ident a table identifier
+   * @param schema the schema of the new table, as a struct type
+   * @return metadata for the new table
+   * @throws TableAlreadyExistsException If a table already exists for the 
identifier
+   */
+  default Table createTable(TableIdentifier ident,
+StructType schema) throws 
TableAlreadyExistsException {
+return createTable(ident, schema, Collections.emptyList(), 
Collections.emptyMap());
+  }
+
+  /**
+   * Create a table in the catalog.
+   *
+   * @param ident a table identifier
+   * @param schema the schema of the new table, as a struct type
+   * @param properties a string map of table properties
+   * @return metadata for the new table
+   * @throws TableAlreadyExistsException If a table already exists for the 
identifier
+   */
+  default Table createTable(TableIdentifier ident,
+StructType schema,
+Map properties) throws 
TableAlreadyExistsException {
+return createTable(ident, schema, Collections.emptyList(), properties);
+  }
+
+  /**
+   * Create a table in the catalog.
+   *
+   * @param ident a table identifier
+   * @param schema the schema of the new table, as a struct type
+   * @param partitions a list of expressions to use for partitioning data 
in the table
+   * @param properties a string map of table properties
+   * @return metadata for the new table
+   * @throws TableAlreadyExistsException If a table already exists for the 
identifier
+   */
+  Table createTable(TableIdentifier ident,
+StructType schema,
+List partitions,
--- End diff --

Another benefit: this would allow us to translate `BUCKETED BY` clauses 
into something we can actually pass to data sources.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...

2018-07-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r200709816
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableCatalog.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.catalog;
+
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface TableCatalog {
+  /**
+   * Load table metadata by {@link TableIdentifier identifier} from the 
catalog.
+   *
+   * @param ident a table identifier
+   * @return the table's metadata
+   * @throws NoSuchTableException If the table doesn't exist.
+   */
+  Table loadTable(TableIdentifier ident) throws NoSuchTableException;
+
+  /**
+   * Create a table in the catalog.
+   *
+   * @param ident a table identifier
+   * @param schema the schema of the new table, as a struct type
+   * @return metadata for the new table
+   * @throws TableAlreadyExistsException If a table already exists for the 
identifier
+   */
+  default Table createTable(TableIdentifier ident,
+StructType schema) throws 
TableAlreadyExistsException {
+return createTable(ident, schema, Collections.emptyList(), 
Collections.emptyMap());
+  }
+
+  /**
+   * Create a table in the catalog.
+   *
+   * @param ident a table identifier
+   * @param schema the schema of the new table, as a struct type
+   * @param properties a string map of table properties
+   * @return metadata for the new table
+   * @throws TableAlreadyExistsException If a table already exists for the 
identifier
+   */
+  default Table createTable(TableIdentifier ident,
+StructType schema,
+Map properties) throws 
TableAlreadyExistsException {
+return createTable(ident, schema, Collections.emptyList(), properties);
+  }
+
+  /**
+   * Create a table in the catalog.
+   *
+   * @param ident a table identifier
+   * @param schema the schema of the new table, as a struct type
+   * @param partitions a list of expressions to use for partitioning data 
in the table
+   * @param properties a string map of table properties
+   * @return metadata for the new table
+   * @throws TableAlreadyExistsException If a table already exists for the 
identifier
+   */
+  Table createTable(TableIdentifier ident,
+StructType schema,
+List partitions,
--- End diff --

> The current end-user API only allows users to specify partition columns.

I think an example would help understand the use of expression here. Right 
now, I can create a table partitioned by day like this:
```
CREATE TABLE t (ts timestamp, data string, day string) PARTITIONED BY (day)
```

Then it's up to queries to supply the right values for `day` in their 
queries. I'm proposing we change that to something like the following that uses 
expressions in the PARTITIONED BY clause instead of only allowing column names:
```
CREATE TABLE t (ts timestamp, data string) PARTITIONED BY (date(ts));
```

This can handle all identity partitioning in Hive tables today and it can 
handle bucketing.

> And why does the "partition transform" belong to a table definition?

Transforms should be passed to the table so the source use them for the 
physical layout. In DataSourceV2, the source could be anything so it needs to 
be the component that handles 

[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r200428354
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -172,6 +173,7 @@ class Analyzer(
   ResolveWindowOrder ::
   ResolveWindowFrame ::
   ResolveNaturalAndUsingJoin ::
+  ResolveOutputRelation ::
--- End diff --

This rule may add `Projection`, `UpCast`, and `Alias` nodes to the plan, so 
there are some rules in this batch that should be run after the output is 
resolved. `ResolveUpCast` will rewrite the casts that were inserted and throw 
exceptions if the cast would truncate and needs to run after this rule.

I could also create a batch just after resolution for output resolution. We 
could just run this rule and `ResolveUpCast`. I think the optimizer will handle 
collapsing `Projection` nodes and aliases are only resolved in this batch, so 
adding resolved aliases shouldn't be a problem. Would you like a separate batch 
for output resolution?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r200423733
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -40,17 +44,24 @@ case class DataSourceV2Relation(
 source: DataSourceV2,
 output: Seq[AttributeReference],
 options: Map[String, String],
-userSpecifiedSchema: Option[StructType])
-  extends LeafNode with MultiInstanceRelation with 
DataSourceV2StringFormat {
+tableIdent: Option[TableIdentifier] = None,
+userSpecifiedSchema: Option[StructType] = None)
+  extends LeafNode with MultiInstanceRelation with NamedRelation with 
DataSourceV2StringFormat {
 
   import DataSourceV2Relation._
 
+  override def name: String = {
+tableIdent.map(_.unquotedString).getOrElse("unknown")
--- End diff --

That's the name of the data source, not the name of the table. I'd be fine 
with updating this if you want to include the source name. What about 
`s"${source.name}:unknown"`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r200423206
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -2120,6 +2122,99 @@ class Analyzer(
 }
   }
 
+  /**
+   * Resolves columns of an output table from the data in a logical plan. 
This rule will:
+   *
+   * - Reorder columns when the write is by name
+   * - Insert safe casts when data types do not match
+   * - Insert aliases when column names do not match
+   * - Detect plans that are not compatible with the output table and 
throw AnalysisException
+   */
+  object ResolveOutputRelation extends Rule[LogicalPlan] {
+override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  case append @ AppendData(table: NamedRelation, query, isByName)
+  if table.resolved && query.resolved && !append.resolved =>
+val projection = resolveOutputColumns(table.name, table.output, 
query, isByName)
+
+if (projection != query) {
+  append.copy(query = projection)
+} else {
+  append
+}
+}
+
+def resolveOutputColumns(
+tableName: String,
+expected: Seq[Attribute],
+query: LogicalPlan,
+byName: Boolean): LogicalPlan = {
+
+  if (expected.size < query.output.size) {
+throw new AnalysisException(
+  s"""Cannot write to '$tableName', too many data columns:
+ |Table columns: ${expected.map(_.name).mkString(", ")}
+ |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+  }
+
+  val errors = new mutable.ArrayBuffer[String]()
+  val resolved: Seq[NamedExpression] = if (byName) {
+expected.flatMap { outAttr =>
+  query.resolveQuoted(outAttr.name, resolver) match {
+case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
+  errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
+  None
+
+case Some(inAttr) if 
!outAttr.dataType.sameType(inAttr.dataType) =>
+  Some(upcast(inAttr, outAttr))
+
+case Some(inAttr) =>
+  Some(inAttr) // matches nullability, datatype, and name
+
+case _ =>
+  errors += s"Cannot find data for output column 
'${outAttr.name}'"
+  None
+  }
+}
+
+  } else {
+if (expected.size > query.output.size) {
+  throw new AnalysisException(
+s"""Cannot write to '$tableName', not enough data columns:
+   |Table columns: ${expected.map(_.name).mkString(", ")}
+   |Data columns: ${query.output.map(_.name).mkString(", 
")}""".stripMargin)
+}
+
+query.output.zip(expected).flatMap {
+  case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable =>
+errors += s"Cannot write nullable values to non-null column 
'${outAttr.name}'"
+None
+
+  case (inAttr, outAttr)
+if !inAttr.dataType.sameType(outAttr.dataType) || inAttr.name 
!= outAttr.name =>
+Some(upcast(inAttr, outAttr))
+
+  case (inAttr, _) =>
+Some(inAttr) // matches nullability, datatype, and name
+}
+  }
+
+  if (errors.nonEmpty) {
+throw new AnalysisException(
+  s"Cannot write incompatible data to table '$tableName':\n- 
${errors.mkString("\n- ")}")
+  }
+
+  Project(resolved, query)
+}
+
+private def upcast(inAttr: NamedExpression, outAttr: Attribute): 
NamedExpression = {
+  Alias(
+UpCast(inAttr, outAttr.dataType, Seq()), outAttr.name
--- End diff --

The purpose of `UpCast` here is to prevent Spark from automatically 
inserting casts that could lose information, like `long` to `int` or `string` 
to `int`.

I would support the same for `string` to `boolean` to catch destructive 
problems from accidental column alignment (in SQL) or similar errors. The main 
problem here is that Spark inserts casts instead of alerting the user that 
there's a problem. When the write succeeds, it may be a while before the user 
realizes the mistake and can't recover the original data.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r200421789
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala 
---
@@ -203,33 +203,33 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
 val path = file.getCanonicalPath
 assert(spark.read.format(cls.getName).option("path", 
path).load().collect().isEmpty)
 
-spark.range(10).select('id, -'id).write.format(cls.getName)
+spark.range(10).select('id as 'i, -'id as 
'j).write.format(cls.getName)
--- End diff --

Yes. The new resolution rule validates the dataframe that will be written 
to the table.

Because this uses the `DataFrameWriter` API, it matches columns by name 
because there isn't a strong expectation for ordering in the dataframe API 
(e.g. `withColumn` doesn't specify where the new column is added).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r200419939
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -82,6 +120,30 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+
+case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal =>
--- End diff --

That doesn't validate the value against the decimal scale from the file, 
which is what I'm suggesting. The decimal scale must match exactly and this is 
a good place to check because this has the file information. If the scale 
doesn't match, then the schema used to read this file is incorrect, which would 
cause data corruption.

In my opinion, it is better to add a check if it is cheap instead of 
debating whether or not some other part of the code covers the case. If this 
were happening per record then I would opt for a different strategy, but 
because this is at the file level it is a good idea to add it here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...

2018-07-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r200418152
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableCatalog.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.catalog;
+
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface TableCatalog {
+  /**
+   * Load table metadata by {@link TableIdentifier identifier} from the 
catalog.
+   *
+   * @param ident a table identifier
+   * @return the table's metadata
+   * @throws NoSuchTableException If the table doesn't exist.
+   */
+  Table loadTable(TableIdentifier ident) throws NoSuchTableException;
+
+  /**
+   * Create a table in the catalog.
+   *
+   * @param ident a table identifier
+   * @param schema the schema of the new table, as a struct type
+   * @return metadata for the new table
+   * @throws TableAlreadyExistsException If a table already exists for the 
identifier
+   */
+  default Table createTable(TableIdentifier ident,
+StructType schema) throws 
TableAlreadyExistsException {
+return createTable(ident, schema, Collections.emptyList(), 
Collections.emptyMap());
+  }
+
+  /**
+   * Create a table in the catalog.
+   *
+   * @param ident a table identifier
+   * @param schema the schema of the new table, as a struct type
+   * @param properties a string map of table properties
+   * @return metadata for the new table
+   * @throws TableAlreadyExistsException If a table already exists for the 
identifier
+   */
+  default Table createTable(TableIdentifier ident,
+StructType schema,
+Map properties) throws 
TableAlreadyExistsException {
+return createTable(ident, schema, Collections.emptyList(), properties);
+  }
+
+  /**
+   * Create a table in the catalog.
+   *
+   * @param ident a table identifier
+   * @param schema the schema of the new table, as a struct type
+   * @param partitions a list of expressions to use for partitioning data 
in the table
+   * @param properties a string map of table properties
+   * @return metadata for the new table
+   * @throws TableAlreadyExistsException If a table already exists for the 
identifier
+   */
+  Table createTable(TableIdentifier ident,
+StructType schema,
+List partitions,
--- End diff --

I recommend reading the proposal SPIP's "Proposed Changes" section, which 
goes into more detail than this comment can. In short, you're thinking of 
partitions as columns like Hive tables, but that is a narrow definition that 
prevents the underlying format from optimizing queries.

Partitions of a table are derived from the column data through some 
transform. For example, partitioning by day uses a day transform from a 
timestamp column: `day(ts)`. Hive doesn't keep track of that transform and 
requires queries to handle it by inserting both `ts` and `day` columns. This 
leads to a few problems, including:
* Hive has no ability to transform `ts > X` to the partition predicate `day 
>= day(X)`. Queries that don't take into account the table's physical storage 
by adding partition predicates by hand will result in full table scans.
* Users can insert any data they choose into the `day` partition and it is 
up to them to do it correctly.

Also, consider bucketing. Bucketing is 

[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-07-04 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21305
  
@cloud-fan, can you also review this PR for DataSourceV2?

This adds the first of the logical plans proposed in [SPIP: Standardize 
Logical 
Plans](https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d):
 `AppendData`. It replaces the current logical node for batch writes and adds 
schema validation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r200181894
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -82,6 +120,30 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+
+case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal =>
--- End diff --

Since this uses the file schema, I think it should validate that the file 
uses the same scale as the value passed in. That's a cheap sanity check to 
ensure correctness.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-07-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r200181749
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -82,6 +120,30 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+
+case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal =>
+  (n: String, v: Any) => FilterApi.eq(
+intColumn(n),
+
Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().intValue()
+  .asInstanceOf[Integer]).orNull)
+case ParquetSchemaType(DECIMAL, INT64, decimal) if pushDownDecimal =>
+  (n: String, v: Any) => FilterApi.eq(
+longColumn(n),
+
Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().longValue()
+  .asInstanceOf[java.lang.Long]).orNull)
+// Legacy DecimalType
+case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, decimal) if 
pushDownDecimal &&
--- End diff --

The binary used for the legacy type and for fixed-length storage should be 
the same, so I don't understand why there are two different conversion methods. 
Also, because this is using the Parquet schema now, there's no need to base the 
length of this binary on what older versions of Spark did -- in other words, if 
the underlying Parquet type is fixed, then just convert the decimal to that 
size fixed without worrying about legacy types.

I think this should pass in the fixed array's length and convert the 
BigDecimal value to that length array for all cases. That works no matter what 
the file contains.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...

2018-07-04 Thread rdblue
Github user rdblue closed the pull request at:

https://github.com/apache/spark/pull/21306


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...

2018-07-04 Thread rdblue
GitHub user rdblue reopened a pull request:

https://github.com/apache/spark/pull/21306

[SPARK-24252][SQL] Add DataSourceV2 mix-in for catalog support.

## What changes were proposed in this pull request?

This adds a mix-in to `DataSourceV2` that allows implementations to support 
catalog operations: load table (for schema), create table, drop table, and 
alter table.

This does not include the proposed transactional API.

## How was this patch tested?

Work in progress. This is for discussion right now.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-24252-add-datasource-v2-catalog

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21306.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21306


commit 915d72f8a3cf8df428ecdac25f30545d963ee5f7
Author: Ryan Blue 
Date:   2018-05-05T01:13:01Z

SPARK-24252: Add v2 data source mix-in for catalog support.

commit fb55395a657588d1f87b13dbb1793d13991dc2b0
Author: Ryan Blue 
Date:   2018-05-11T21:27:47Z

SPARK-24252: Add copyright headers.

commit 42ed4a4a138e5c5f681755d871fd9d9030a4619a
Author: Ryan Blue 
Date:   2018-07-04T17:02:52Z

SPARK-24252: Update for review comments.

* Rename CatalogSupport to TableSupport
* Rename DataSourceCatalog to TableCatalog
* Remove name and database from Table

commit 023995d15b4293fac1530da6bd966b6ab6823980
Author: Ryan Blue 
Date:   2018-07-04T17:19:45Z

SPARK-24252: Add TableChange example to Javadocs.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for catalog s...

2018-07-04 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21306
  
@cloud-fan, I've updated this to address your comments. Thanks for the 
reviews!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...

2018-07-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r200178463
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableChange.java 
---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.catalog;
+
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * TableChange subclasses represent requested changes to a table. These 
are passed to
+ * {@link DataSourceCatalog#alterTable}.
+ */
+public interface TableChange {
+
+  /**
+   * Create a TableChange for adding a top-level column to a table.
+   * 
+   * Because "." may be interpreted as a field path separator or may be 
used in field names, it is
+   * not allowed in names passed to this method. To add to nested types or 
to add fields with
+   * names that contain ".", use {@link #addColumn(String, String, 
DataType)}.
+   *
+   * @param name the new top-level column name
+   * @param dataType the new column's data type
+   * @return a TableChange for the addition
+   */
+  static TableChange addColumn(String name, DataType dataType) {
+return new AddColumn(null, name, dataType);
+  }
+
+  /**
+   * Create a TableChange for adding a nested column to a table.
+   * 
+   * The parent name is used to find the parent struct type where the 
nested field will be added.
+   * If the parent name is null, the new column will be added to the root 
as a top-level column.
+   * If parent identifies a struct, a new column is added to that struct. 
If it identifies a list,
+   * the column is added to the list element struct, and if it identifies 
a map, the new column is
+   * added to the map's value struct.
+   * 
+   * The given name is used to name the new column and names containing 
"." are not handled
+   * differently.
+   *
+   * @param parent the new field's parent
+   * @param name the new field name
+   * @param dataType the new field's data type
+   * @return a TableChange for the addition
+   */
+  static TableChange addColumn(String parent, String name, DataType 
dataType) {
+return new AddColumn(parent, name, dataType);
+  }
+
+  /**
+   * Create a TableChange for renaming a field.
+   * 
+   * The name is used to find the field to rename. The new name will 
replace the name of the type.
+   * For example, renameColumn("a.b.c", "x") should produce column a.b.x.
--- End diff --

I added an example to the Javadocs:

```scala
import TableChange._
val catalog = source.asInstanceOf[TableSupport].catalog()
catalog.alterTable(ident,
addColumn("x", IntegerType),
renameColumn("a", "b"),
deleteColumn("c")
  )
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...

2018-07-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r200177371
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableChange.java 
---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.catalog;
+
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * TableChange subclasses represent requested changes to a table. These 
are passed to
+ * {@link DataSourceCatalog#alterTable}.
+ */
+public interface TableChange {
+
+  /**
+   * Create a TableChange for adding a top-level column to a table.
+   * 
+   * Because "." may be interpreted as a field path separator or may be 
used in field names, it is
+   * not allowed in names passed to this method. To add to nested types or 
to add fields with
+   * names that contain ".", use {@link #addColumn(String, String, 
DataType)}.
+   *
+   * @param name the new top-level column name
+   * @param dataType the new column's data type
+   * @return a TableChange for the addition
+   */
+  static TableChange addColumn(String name, DataType dataType) {
+return new AddColumn(null, name, dataType);
+  }
+
+  /**
+   * Create a TableChange for adding a nested column to a table.
+   * 
+   * The parent name is used to find the parent struct type where the 
nested field will be added.
+   * If the parent name is null, the new column will be added to the root 
as a top-level column.
+   * If parent identifies a struct, a new column is added to that struct. 
If it identifies a list,
+   * the column is added to the list element struct, and if it identifies 
a map, the new column is
+   * added to the map's value struct.
+   * 
+   * The given name is used to name the new column and names containing 
"." are not handled
+   * differently.
+   *
+   * @param parent the new field's parent
+   * @param name the new field name
+   * @param dataType the new field's data type
+   * @return a TableChange for the addition
+   */
+  static TableChange addColumn(String parent, String name, DataType 
dataType) {
+return new AddColumn(parent, name, dataType);
+  }
+
+  /**
+   * Create a TableChange for renaming a field.
+   * 
+   * The name is used to find the field to rename. The new name will 
replace the name of the type.
+   * For example, renameColumn("a.b.c", "x") should produce column a.b.x.
+   *
+   * @param name the current field name
+   * @param newName the new name
+   * @return a TableChange for the rename
+   */
+  static TableChange renameColumn(String name, String newName) {
+return new RenameColumn(name, newName);
+  }
+
+  /**
+   * Create a TableChange for updating the type of a field.
+   * 
+   * The name is used to find the field to update.
+   *
+   * @param name the field name
+   * @param newDataType the new data type
+   * @return a TableChange for the update
+   */
+  static TableChange updateColumn(String name, DataType newDataType) {
+return new UpdateColumn(name, newDataType);
+  }
+
+  /**
+   * Create a TableChange for deleting a field from a table.
+   *
+   * @param name the name of the field to delete
+   * @return a TableChange for the delete
+   */
+  static TableChange deleteColumn(String name) {
+return new DeleteColumn(name);
+  }
+
+  final class AddColumn implements TableChange {
--- End diff --

And, I'm not sure it's possible to implement unapply in Java. Not even 
implementing Product works.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...

2018-07-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r200174526
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableChange.java 
---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.catalog;
+
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * TableChange subclasses represent requested changes to a table. These 
are passed to
+ * {@link DataSourceCatalog#alterTable}.
+ */
+public interface TableChange {
+
+  /**
+   * Create a TableChange for adding a top-level column to a table.
+   * 
+   * Because "." may be interpreted as a field path separator or may be 
used in field names, it is
+   * not allowed in names passed to this method. To add to nested types or 
to add fields with
+   * names that contain ".", use {@link #addColumn(String, String, 
DataType)}.
+   *
+   * @param name the new top-level column name
+   * @param dataType the new column's data type
+   * @return a TableChange for the addition
+   */
+  static TableChange addColumn(String name, DataType dataType) {
+return new AddColumn(null, name, dataType);
+  }
+
+  /**
+   * Create a TableChange for adding a nested column to a table.
+   * 
+   * The parent name is used to find the parent struct type where the 
nested field will be added.
+   * If the parent name is null, the new column will be added to the root 
as a top-level column.
+   * If parent identifies a struct, a new column is added to that struct. 
If it identifies a list,
+   * the column is added to the list element struct, and if it identifies 
a map, the new column is
+   * added to the map's value struct.
+   * 
+   * The given name is used to name the new column and names containing 
"." are not handled
+   * differently.
+   *
+   * @param parent the new field's parent
+   * @param name the new field name
+   * @param dataType the new field's data type
+   * @return a TableChange for the addition
+   */
+  static TableChange addColumn(String parent, String name, DataType 
dataType) {
+return new AddColumn(parent, name, dataType);
+  }
+
+  /**
+   * Create a TableChange for renaming a field.
+   * 
+   * The name is used to find the field to rename. The new name will 
replace the name of the type.
+   * For example, renameColumn("a.b.c", "x") should produce column a.b.x.
+   *
+   * @param name the current field name
+   * @param newName the new name
+   * @return a TableChange for the rename
+   */
+  static TableChange renameColumn(String name, String newName) {
+return new RenameColumn(name, newName);
+  }
+
+  /**
+   * Create a TableChange for updating the type of a field.
+   * 
+   * The name is used to find the field to update.
+   *
+   * @param name the field name
+   * @param newDataType the new data type
+   * @return a TableChange for the update
+   */
+  static TableChange updateColumn(String name, DataType newDataType) {
+return new UpdateColumn(name, newDataType);
+  }
+
+  /**
+   * Create a TableChange for deleting a field from a table.
+   *
+   * @param name the name of the field to delete
+   * @return a TableChange for the delete
+   */
+  static TableChange deleteColumn(String name) {
+return new DeleteColumn(name);
+  }
+
+  final class AddColumn implements TableChange {
--- End diff --

Nevermind, I forgot that these are in an interface so they are 
automatically public.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...

2018-07-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r200173138
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/Table.java ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.catalog;
+
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents table metadata from a {@link DataSourceCatalog}.
+ */
+public interface Table {
--- End diff --

I'll just remove `name` and `database`. We can add them later when we 
figure out how we want to handle it. We need partitioning right away, though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...

2018-07-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r200171696
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableChange.java 
---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.catalog;
+
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * TableChange subclasses represent requested changes to a table. These 
are passed to
+ * {@link DataSourceCatalog#alterTable}.
+ */
+public interface TableChange {
+
+  /**
+   * Create a TableChange for adding a top-level column to a table.
+   * 
+   * Because "." may be interpreted as a field path separator or may be 
used in field names, it is
+   * not allowed in names passed to this method. To add to nested types or 
to add fields with
+   * names that contain ".", use {@link #addColumn(String, String, 
DataType)}.
+   *
+   * @param name the new top-level column name
+   * @param dataType the new column's data type
+   * @return a TableChange for the addition
+   */
+  static TableChange addColumn(String name, DataType dataType) {
+return new AddColumn(null, name, dataType);
+  }
+
+  /**
+   * Create a TableChange for adding a nested column to a table.
+   * 
+   * The parent name is used to find the parent struct type where the 
nested field will be added.
+   * If the parent name is null, the new column will be added to the root 
as a top-level column.
+   * If parent identifies a struct, a new column is added to that struct. 
If it identifies a list,
+   * the column is added to the list element struct, and if it identifies 
a map, the new column is
+   * added to the map's value struct.
+   * 
+   * The given name is used to name the new column and names containing 
"." are not handled
+   * differently.
+   *
+   * @param parent the new field's parent
+   * @param name the new field name
+   * @param dataType the new field's data type
+   * @return a TableChange for the addition
+   */
+  static TableChange addColumn(String parent, String name, DataType 
dataType) {
+return new AddColumn(parent, name, dataType);
+  }
+
+  /**
+   * Create a TableChange for renaming a field.
+   * 
+   * The name is used to find the field to rename. The new name will 
replace the name of the type.
+   * For example, renameColumn("a.b.c", "x") should produce column a.b.x.
+   *
+   * @param name the current field name
+   * @param newName the new name
+   * @return a TableChange for the rename
+   */
+  static TableChange renameColumn(String name, String newName) {
+return new RenameColumn(name, newName);
+  }
+
+  /**
+   * Create a TableChange for updating the type of a field.
+   * 
+   * The name is used to find the field to update.
+   *
+   * @param name the field name
+   * @param newDataType the new data type
+   * @return a TableChange for the update
+   */
+  static TableChange updateColumn(String name, DataType newDataType) {
+return new UpdateColumn(name, newDataType);
+  }
+
+  /**
+   * Create a TableChange for deleting a field from a table.
+   *
+   * @param name the name of the field to delete
+   * @return a TableChange for the delete
+   */
+  static TableChange deleteColumn(String name) {
+return new DeleteColumn(name);
+  }
+
+  final class AddColumn implements TableChange {
--- End diff --

Just noticed that these aren't public, but should be because they will be 
passed to implementations through `alterTable`.

These should also implement `unapply` for Scala implementations.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...

2018-07-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r200171424
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/Table.java ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.catalog;
+
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents table metadata from a {@link DataSourceCatalog}.
+ */
+public interface Table {
--- End diff --

I updated the last comment because I thought this was referring to 
`CatalogSupport` at first. Sorry about the confusion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...

2018-07-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r200170560
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/TableChange.java 
---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.catalog;
+
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * TableChange subclasses represent requested changes to a table. These 
are passed to
+ * {@link DataSourceCatalog#alterTable}.
+ */
+public interface TableChange {
+
+  /**
+   * Create a TableChange for adding a top-level column to a table.
+   * 
+   * Because "." may be interpreted as a field path separator or may be 
used in field names, it is
+   * not allowed in names passed to this method. To add to nested types or 
to add fields with
+   * names that contain ".", use {@link #addColumn(String, String, 
DataType)}.
+   *
+   * @param name the new top-level column name
+   * @param dataType the new column's data type
+   * @return a TableChange for the addition
+   */
+  static TableChange addColumn(String name, DataType dataType) {
+return new AddColumn(null, name, dataType);
+  }
+
+  /**
+   * Create a TableChange for adding a nested column to a table.
+   * 
+   * The parent name is used to find the parent struct type where the 
nested field will be added.
+   * If the parent name is null, the new column will be added to the root 
as a top-level column.
+   * If parent identifies a struct, a new column is added to that struct. 
If it identifies a list,
+   * the column is added to the list element struct, and if it identifies 
a map, the new column is
+   * added to the map's value struct.
+   * 
+   * The given name is used to name the new column and names containing 
"." are not handled
+   * differently.
+   *
+   * @param parent the new field's parent
+   * @param name the new field name
+   * @param dataType the new field's data type
+   * @return a TableChange for the addition
+   */
+  static TableChange addColumn(String parent, String name, DataType 
dataType) {
+return new AddColumn(parent, name, dataType);
+  }
+
+  /**
+   * Create a TableChange for renaming a field.
+   * 
+   * The name is used to find the field to rename. The new name will 
replace the name of the type.
+   * For example, renameColumn("a.b.c", "x") should produce column a.b.x.
--- End diff --

Are you looking for examples in Javadoc, or an example implementation?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...

2018-07-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r200170480
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/catalog/Table.java ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.catalog;
+
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents table metadata from a {@link DataSourceCatalog}.
+ */
+public interface Table {
--- End diff --

This interface is for named tables, not path-based tables. I think that 
would probably be a different interface because not all sources can support 
path-based tables. Cassandra and JDBC are good examples.

For the table metadata, I think we do need partitions. Iceberg creates 
partitioned tables and I'd like to start getting the DDL operations working. 
This is why I proposed this metadata on the SPIP a few months ago. We seem to 
have lazy consensus around it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for ca...

2018-07-04 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r200170491
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/CatalogSupport.java ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.sql.sources.v2.catalog.DataSourceCatalog;
+
+/**
+ * A mix-in interface for {@link DataSourceV2} catalog support. Data 
sources can implement this
+ * interface to provide the ability to load, create, alter, and drop 
tables.
+ * 
+ * Data sources must implement this interface to support logical 
operations that combine writing
+ * data with catalog tasks, like create-table-as-select.
+ */
+public interface CatalogSupport {
--- End diff --

Works for me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-04 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21696
  
Thanks, @wangyum! I think this is refactor was a good idea.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r199980993
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -19,166 +19,186 @@ package 
org.apache.spark.sql.execution.datasources.parquet
 
 import java.sql.Date
 
+import scala.collection.JavaConverters._
+
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
 private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
 
+  case class ParquetSchemaType(
+  originalType: OriginalType,
+  primitiveTypeName: PrimitiveType.PrimitiveTypeName,
+  decimalMetadata: DecimalMetadata)
+
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
-  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+// BooleanType
+case ParquetSchemaType(null, BOOLEAN, null) =>
   (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+// IntegerType
+case ParquetSchemaType(null, INT32, null) =>
   (n: String, v: Any) => FilterApi.eq(intColumn(n), 
v.asInstanceOf[Integer])
-case LongType =>
+// LongType
+case ParquetSchemaType(null, INT64, null) =>
   (n: String, v: Any) => FilterApi.eq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-case FloatType =>
+// FloatType
+case ParquetSchemaType(null, FLOAT, null) =>
   (n: String, v: Any) => FilterApi.eq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-case DoubleType =>
+// DoubleType
+case ParquetSchemaType(null, DOUBLE, null) =>
   (n: String, v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
+// StringType
 // Binary.fromString and Binary.fromByteArray don't accept null values
-case StringType =>
+case ParquetSchemaType(UTF8, BINARY, null) =>
   (n: String, v: Any) => FilterApi.eq(
 binaryColumn(n),
 Option(v).map(s => 
Binary.fromString(s.asInstanceOf[String])).orNull)
-case BinaryType =>
+// BinaryType
+case ParquetSchemaType(null, BINARY, null) =>
   (n: String, v: Any) => FilterApi.eq(
 binaryColumn(n),
 Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-case DateType if pushDownDate =>
+// DateType
+case ParquetSchemaType(DATE, INT32, null) if pushDownDate =>
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
   }
 
-  private val makeNotEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) 
=> FilterPredicate] = {
+case ParquetSchemaType(null, BOOLEAN, null) =>
   (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+case ParquetSchemaType(null, INT32, null) =>
   (n: String, v: Any) => FilterApi.notEq(intColumn(n), 
v.asInstanceOf[Integer])
-case LongType =>
+case ParquetSchemaType(null, INT64, null) =>
   (n: String, v: Any) => FilterApi.notEq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-case FloatType =>
+case ParquetSchemaType(null, FLOAT, null) =>
   (n: String, v: Any) => FilterApi.notEq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-case DoubleType =>
+case ParquetSchemaType(null, DOUBLE, null) =>
   (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])

[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r199980897
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -19,166 +19,186 @@ package 
org.apache.spark.sql.execution.datasources.parquet
 
 import java.sql.Date
 
+import scala.collection.JavaConverters._
+
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
 private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
 
+  case class ParquetSchemaType(
+  originalType: OriginalType,
+  primitiveTypeName: PrimitiveType.PrimitiveTypeName,
+  decimalMetadata: DecimalMetadata)
+
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
-  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+// BooleanType
--- End diff --

The other partial functions don't have these comments. Is that on purpose? 
Maybe these should be constants instead to make the code more readable and 
consistent?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r199980632
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -19,166 +19,186 @@ package 
org.apache.spark.sql.execution.datasources.parquet
 
 import java.sql.Date
 
+import scala.collection.JavaConverters._
+
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
 private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
 
+  case class ParquetSchemaType(
+  originalType: OriginalType,
+  primitiveTypeName: PrimitiveType.PrimitiveTypeName,
+  decimalMetadata: DecimalMetadata)
+
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
-  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+// BooleanType
+case ParquetSchemaType(null, BOOLEAN, null) =>
   (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+// IntegerType
+case ParquetSchemaType(null, INT32, null) =>
--- End diff --

Before, it was a valid assumption that the value's type matched the 
`DataType`. Now that this is the file's type that might not be the case. For 
example, byte and short are stored as INT32. This should cast to Number and 
then convert to the file's type.

I would also do this for INT64 columns, in case the schema has evolved and 
a column that was INT32 is not INT64. The converters (used to materialize 
records) don't currently support this, but it would be reasonable for them to 
support it eventually.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r199979463
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -379,14 +366,29 @@ class ParquetFileFormat
   null)
 
   val sharedConf = broadcastedHadoopConf.value.value
+
+  val fileMetaData =
+ParquetFileReader.readFooter(sharedConf, fileSplit.getPath, 
SKIP_ROW_GROUPS).getFileMetaData
--- End diff --

We should *always* read the footer and adjust the filters to match. In our 
version we use the following to handle situations where the column names have 
changed case or where Hive has returned a different case:

```
  // Try to push down filters when filter push-down is enabled.
  val pushed = if (pushdownEnabled) {
// read the file schema to create Parquet filters that match case
val fileReader = ParquetFileReader.open(conf, fileSplit.getPath)
val fileSchema = try {
  new ParquetToSparkSchemaConverter(conf).convert(
ParquetReadSupport.clipParquetSchema(
  fileReader.getFileMetaData.getSchema, requiredSchema))
} finally {
  fileReader.close()
}

filters
// Collects all converted Parquet filter predicates. Notice 
that not all predicates can
// be converted (`ParquetFilters.createFilter` returns an 
`Option`). That's why a
// `flatMap` is used here.
.flatMap(ParquetFilters.createFilter(fileSchema, _))
.reduceOption(FilterApi.and)
  } else {
None
  }
```

That's pretty much the same thing as here. In addition, if columns can be 
renamed within a Parquet file then you need to push filters for the names used 
in the file's schema. I don't think that's a problem here because I don't know 
of a way to rename columns in a Spark Parquet table. (Iceberg uses field IDs to 
do this.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21682: [SPARK-24706][SQL] ByteType and ShortType support pushdo...

2018-07-03 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21682
  
+1

I agree with some of the minor refactoring suggestions, but overall this 
looks correct to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21682: [SPARK-24706][SQL] ByteType and ShortType support...

2018-07-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21682#discussion_r199977784
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -69,6 +77,14 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   private val makeNotEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
 case BooleanType =>
   (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
+case ByteType =>
--- End diff --

Usually, both byte and short would be stored as integers in Parquet. 
Because Parquet uses bit packing, it doesn't matter if you store them as ints 
(or even longs) because they'll get packed into the same space.

The important thing is to match the Parquet file's type when pushing a 
filter. Since Spark stores ByteType and ShortType in Parquet as INT32, this is 
correct.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21682: [SPARK-24706][SQL] ByteType and ShortType support...

2018-07-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21682#discussion_r199977420
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -42,6 +42,14 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean, pushDownStartWith:
   private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
 case BooleanType =>
   (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
+case ByteType =>
+  (n: String, v: Any) => FilterApi.eq(
+intColumn(n),
+Option(v).map(b => 
b.asInstanceOf[java.lang.Byte].toInt.asInstanceOf[Integer]).orNull)
--- End diff --

I agree.

Also, there's no need to use `Option.map` because the value cannot be null. 
That's why the `IntegerType` case just casts the value.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for catalog s...

2018-07-03 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21306
  
@cloud-fan, thanks for the thorough feedback!

> What catalog operations we want to forward to the data source catalog? 
Currently it's create/drop/alter table, I think it's good enough for now.

This PR introduces create, drop, and alter. We can always add more later. 
These are the ones that we need to implement DataSourceV2 operations and DDL 
support.

> Spark provides an API so that end-users can do it directly. e.g. 
`spark.catalog("iceberge").createTable(...)`, or SQL API `CREATE TABLE 
iceberge.db1.tbl1 . . .`

These two are the easiest and least intrusive way to start because the data 
source catalog interaction is explicitly tied to a catalog. It also matches the 
behavior used by other systems for multiple catalogs. I think this is what we 
should start with and then tackle ideas like your second point.

> When creating/dropping/altering Spark tables, also forward it to the data 
source catalog. . .

For this and a couple other questions, I don't think we need to decide 
right now. This PR is about getting the interface for other sources in Spark. 
We don't necessarily need to know all of the ways that users will call it or 
interact with it, like how `DESC TABLE` will work.

To your question here, I'm not sure whether the `CREATE TABLE ... USING 
source` syntax should use the default catalog or defer to the catalog for 
`source` or forward to both, but that doesn't need to block adding this API 
because I think we can decide it later. In addition, we should probably discuss 
this on the dev list to make sure we get the behavior right.

> How to lookup the table metadata from data source catalog?

The SPIP proposes two catalog interfaces that return `Table`. One that uses 
table identifiers and one that uses paths. Data sources can implement support 
for both or just one. This PR includes just the support for table identifiers. 
We would add a similar API for path-based tables in another PR.

> How to define table metadata? Maybe we can forward `DESC TABLE` . . .

That sounds like a reasonable idea to me. Like the behavior of `USING`, I 
don't think this is something that we have to decide right now. We can add 
support later as we implement table DDL. Maybe `Table` should return a DF that 
is its `DESCRIBE` output.

> How does the table metadata involve in data reading/writing?

This is another example of something we don't need to decide yet. We have a 
couple different options for the behavior and will want to think them through 
and discuss them on the dev list. But I don't think that the behavior 
necessarily needs to be decided before we add this API to sources.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-06-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r198907669
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -359,6 +369,70 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
 }
   }
 
+  test("filter pushdown - decimal") {
+Seq(true, false).foreach { legacyFormat =>
+  withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> 
legacyFormat.toString) {
+Seq(s"_1 decimal(${Decimal.MAX_INT_DIGITS}, 2)", // 
32BitDecimalType
+  s"_1 decimal(${Decimal.MAX_LONG_DIGITS}, 2)",  // 
64BitDecimalType
+  "_1 decimal(38, 18)"   // 
ByteArrayDecimalType
+).foreach { schemaDDL =>
+  val schema = StructType.fromDDL(schemaDDL)
+  val rdd =
+spark.sparkContext.parallelize((1 to 4).map(i => Row(new 
java.math.BigDecimal(i
+  val dataFrame = spark.createDataFrame(rdd, schema)
+  testDecimalPushDown(dataFrame) { implicit df =>
+assert(df.schema === schema)
+checkFilterPredicate('_1.isNull, classOf[Eq[_]], 
Seq.empty[Row])
+checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
+
+checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1)
+checkFilterPredicate('_1 =!= 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
+
+checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
+checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
+checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+
+checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1)
+checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
+checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
+checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+
+checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+checkFilterPredicate('_1 < 2 || '_1 > 3, 
classOf[Operators.Or], Seq(Row(1), Row(4)))
+  }
+}
+  }
+}
+  }
+
+  test("incompatible parquet file format will throw exeception") {
--- End diff --

If we can detect the case where the data is written with the legacy format, 
then why do we need a property to read with the legacy format? Why not do the 
right thing without a property?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-06-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r198906232
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -359,6 +369,70 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
 }
   }
 
+  test("filter pushdown - decimal") {
+Seq(true, false).foreach { legacyFormat =>
+  withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> 
legacyFormat.toString) {
+Seq(s"_1 decimal(${Decimal.MAX_INT_DIGITS}, 2)", // 
32BitDecimalType
--- End diff --

Since this is providing a column name, it would be better to use something 
more readable than _1.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-06-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r198904779
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -62,6 +98,30 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean) {
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+case decimal: DecimalType
+  if pushDownDecimal && (DecimalType.is32BitDecimalType(decimal) && 
!readLegacyFormat) =>
+  (n: String, v: Any) => FilterApi.eq(
+intColumn(n),
+
Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().intValue()
--- End diff --

Does this need to validate the scale of the decimal, or is scale adjusted 
in the analyzer?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-06-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r198904504
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -62,6 +98,30 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean) {
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+case decimal: DecimalType
+  if pushDownDecimal && (DecimalType.is32BitDecimalType(decimal) && 
!readLegacyFormat) =>
+  (n: String, v: Any) => FilterApi.eq(
+intColumn(n),
+
Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().intValue()
+  .asInstanceOf[Integer]).orNull)
+case decimal: DecimalType
+  if pushDownDecimal && (DecimalType.is64BitDecimalType(decimal) && 
!readLegacyFormat) =>
+  (n: String, v: Any) => FilterApi.eq(
+longColumn(n),
+
Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().longValue()
+.asInstanceOf[java.lang.Long]).orNull)
+case decimal: DecimalType
+  if pushDownDecimal && ((DecimalType.is32BitDecimalType(decimal) && 
readLegacyFormat)
--- End diff --

Please add comments here to explain what differs when `readLegacyFormat` is 
true.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...

2018-06-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21556#discussion_r198904089
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -378,6 +378,22 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED =
+buildConf("spark.sql.parquet.filterPushdown.decimal")
+  .doc(s"If true, enables Parquet filter push-down optimization for 
Decimal. " +
+"The default value is false to compatible with legacy parquet 
format. " +
+s"This configuration only has an effect when 
'${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " +
+"enabled and Decimal statistics are generated(Since Spark 2.4).")
+  .internal()
+  .booleanConf
+  .createWithDefault(true)
+
+  val PARQUET_READ_LEGACY_FORMAT = 
buildConf("spark.sql.parquet.readLegacyFormat")
--- End diff --

This property doesn't mention pushdown, but the description says it is only 
valid for push-down. Can you make the property name more clear?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21623: [SPARK-24638][SQL] StringStartsWith support push down

2018-06-27 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21623
  
Overall, I think this is close. The tests need to cover the row group stats 
case and we should update how configuration is passed to the filters. Thanks 
for working on this, @wangyum!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21623: [SPARK-24638][SQL] StringStartsWith support push ...

2018-06-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21623#discussion_r198553569
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -22,16 +22,23 @@ import java.sql.Date
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
+import org.apache.parquet.schema.PrimitiveComparator
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
-private[parquet] class ParquetFilters(pushDownDate: Boolean) {
+private[parquet] class ParquetFilters() {
+
+  val sqlConf: SQLConf = SQLConf.get
--- End diff --

This should pass in `pushDownDate` and `pushDownStartWith` like the 
previous version did with just the date setting.

The SQLConf is already available in ParquetFileFormat and it *would* be 
better to pass it in. The problem is that this class is instantiated in the 
function (`(file: PartitionedFile) => { ... }`) that gets serialized and sent 
to executors. That means we don't want SQLConf and its references in the 
function's closure. The way we got around this before was to put boolean config 
vals in the closure instead. I think you should go with that approach.

I'm not sure what `SQLConf.get` is for or what a correct use would be. 
@gatorsmile, can you comment on use of `SQLConf.get`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21623: [SPARK-24638][SQL] StringStartsWith support push ...

2018-06-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21623#discussion_r198551889
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
@@ -660,6 +661,56 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   assert(df.where("col > 0").count() === 2)
 }
   }
+
+  test("filter pushdown - StringStartsWith") {
+withParquetDataFrame((1 to 4).map(i => Tuple1(i + "str" + i))) { 
implicit df =>
--- End diff --

I think that all of these tests go through the `keep` method instead of the 
`canDrop` and `inverseCanDrop`. I think those methods need to be tested. You 
can do that by constructing a Parquet file with row groups that have 
predictable statistics, but that would be difficult. An easier way to do this 
is to define the predicate class elsewhere and create a unit test for it that 
passes in different statistics values.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21262: [SPARK-24172][SQL]: Push projection and filters o...

2018-06-26 Thread rdblue
Github user rdblue closed the pull request at:

https://github.com/apache/spark/pull/21262


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for catalog s...

2018-06-26 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21306
  
@cloud-fan, what needs to change to get this in? I'd like to start making 
more PRs based on these changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21623: [SPARK-24638][SQL] StringStartsWith support push ...

2018-06-26 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21623#discussion_r198244664
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -270,6 +277,29 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean) {
   case sources.Not(pred) =>
 createFilter(schema, pred).map(FilterApi.not)
 
+  case sources.StringStartsWith(name, prefix) if pushDownStartWith && 
canMakeFilterOn(name) =>
+Option(prefix).map { v =>
+  FilterApi.userDefined(binaryColumn(name),
+new UserDefinedPredicate[Binary] with Serializable {
+  private val strToBinary = 
Binary.fromReusedByteArray(v.getBytes)
+  private val size = strToBinary.length
+
+  override def canDrop(statistics: Statistics[Binary]): 
Boolean = {
+val comparator = 
PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR
+val max = statistics.getMax
+val min = statistics.getMin
+comparator.compare(max.slice(0, math.min(size, 
max.length)), strToBinary) < 0 ||
+  comparator.compare(min.slice(0, math.min(size, 
min.length)), strToBinary) > 0
+  }
+
+  override def inverseCanDrop(statistics: Statistics[Binary]): 
Boolean = false
--- End diff --

Sorry, I meant if the min and max both *include* the prefix, then we should 
be able to drop the range. The situation is where both min and max match, so 
all values must also match the filter. If we are looking for values that do not 
match the filter, then we can eliminate the row group.

The example is prefix=CCC and values are between min=CCCa and max=CCCZ: all 
values start with CCC, so the entire row group can be skipped.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21623: [SPARK-24638][SQL] StringStartsWith support push ...

2018-06-26 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21623#discussion_r198230713
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -270,6 +277,29 @@ private[parquet] class ParquetFilters(pushDownDate: 
Boolean) {
   case sources.Not(pred) =>
 createFilter(schema, pred).map(FilterApi.not)
 
+  case sources.StringStartsWith(name, prefix) if pushDownStartWith && 
canMakeFilterOn(name) =>
+Option(prefix).map { v =>
+  FilterApi.userDefined(binaryColumn(name),
+new UserDefinedPredicate[Binary] with Serializable {
+  private val strToBinary = 
Binary.fromReusedByteArray(v.getBytes)
+  private val size = strToBinary.length
+
+  override def canDrop(statistics: Statistics[Binary]): 
Boolean = {
+val comparator = 
PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR
+val max = statistics.getMax
+val min = statistics.getMin
+comparator.compare(max.slice(0, math.min(size, 
max.length)), strToBinary) < 0 ||
+  comparator.compare(min.slice(0, math.min(size, 
min.length)), strToBinary) > 0
+  }
+
+  override def inverseCanDrop(statistics: Statistics[Binary]): 
Boolean = false
--- End diff --

Why can't this evaluate the inverse of `StartsWith`? If the min and max 
values exclude the prefix, then this should be able to filter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21615: [SPARK-24552][core][sql] Use unique id instead of attemp...

2018-06-22 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21615
  
+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...

2018-06-22 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21606#discussion_r197552309
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -125,11 +124,11 @@ object DataWritingSparkTask extends Logging {
 val coordinator = SparkEnv.get.outputCommitCoordinator
 val commitAuthorized = coordinator.canCommit(stageId, 
stageAttempt, partId, attemptId)
 if (commitAuthorized) {
-  logInfo(s"Writer for stage $stageId / $stageAttempt, " +
+  logInfo(s"Writer for stage $stageId.$stageAttempt, " +
 s"task $partId.$attemptId is authorized to commit.")
   dataWriter.commit()
 } else {
-  val message = s"Stage $stageId / $stageAttempt, " +
+  val message = s"Stage $stageId.$stageAttempt, " +
--- End diff --

+1 Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21606: [SPARK-24552][core][SQL] Use task ID instead of attempt ...

2018-06-22 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21606
  
+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...

2018-06-22 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21606#discussion_r197547079
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala ---
@@ -76,13 +76,17 @@ object SparkHadoopWriter extends Logging {
 // Try to write all RDD partitions as a Hadoop OutputFormat.
 try {
   val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: 
Iterator[(K, V)]) => {
+// SPARK-24552: Generate a unique "attempt ID" based on the stage 
and task atempt numbers.
+// Assumes that there won't be more than Short.MaxValue attempts, 
at least not concurrently.
+val attemptId = (context.stageAttemptNumber << 16) | 
context.attemptNumber
--- End diff --

Okay, that makes sense if this is just for Hadoop attempt IDs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...

2018-06-22 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21606#discussion_r197543585
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala ---
@@ -104,12 +104,12 @@ object SparkHadoopWriter extends Logging {
   jobTrackerId: String,
   commitJobId: Int,
   sparkPartitionId: Int,
-  sparkAttemptNumber: Int,
+  sparkTaskId: Long,
   committer: FileCommitProtocol,
   iterator: Iterator[(K, V)]): TaskCommitMessage = {
 // Set up a task.
 val taskContext = config.createTaskAttemptContext(
-  jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber)
+  jobTrackerId, commitJobId, sparkPartitionId, sparkTaskId.toInt)
--- End diff --

To backport this, can we use the `.toInt` version? I think that should be 
safe.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21577: [SPARK-24589][core] Correctly identify tasks in output c...

2018-06-22 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21577
  
Thanks for fixing this, @vanzin!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...

2018-06-22 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21606#discussion_r197542830
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -125,11 +124,11 @@ object DataWritingSparkTask extends Logging {
 val coordinator = SparkEnv.get.outputCommitCoordinator
 val commitAuthorized = coordinator.canCommit(stageId, 
stageAttempt, partId, attemptId)
 if (commitAuthorized) {
-  logInfo(s"Writer for stage $stageId / $stageAttempt, " +
+  logInfo(s"Writer for stage $stageId.$stageAttempt, " +
 s"task $partId.$attemptId is authorized to commit.")
   dataWriter.commit()
 } else {
-  val message = s"Stage $stageId / $stageAttempt, " +
+  val message = s"Stage $stageId.$stageAttempt, " +
--- End diff --

(This is for the next line, sorry for the confusion)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...

2018-06-22 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21606#discussion_r197542704
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala ---
@@ -104,12 +104,12 @@ object SparkHadoopWriter extends Logging {
   jobTrackerId: String,
   commitJobId: Int,
   sparkPartitionId: Int,
-  sparkAttemptNumber: Int,
+  sparkTaskId: Long,
   committer: FileCommitProtocol,
   iterator: Iterator[(K, V)]): TaskCommitMessage = {
 // Set up a task.
 val taskContext = config.createTaskAttemptContext(
-  jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber)
+  jobTrackerId, commitJobId, sparkPartitionId, sparkTaskId.toInt)
--- End diff --

I commented before I saw this thread, but I think it is better to use the 
TID because that is already exposed in the UI so it is better for tracking 
between UI tasks and logs. The combined attempt number isn't used anywhere so 
this would introduce another number to identify a task. And anyway, shifting by 
16 means that these grow huge anyway.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...

2018-06-22 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21606#discussion_r197542014
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 ---
@@ -42,15 +42,12 @@
*Usually Spark processes many RDD partitions at the 
same time,
*implementations should use the partition id to 
distinguish writers for
*different partitions.
-   * @param attemptNumber Spark may launch multiple tasks with the same 
task id. For example, a task
-   *  failed, Spark launches a new task wth the same 
task id but different
-   *  attempt number. Or a task is too slow, Spark 
launches new tasks wth the
-   *  same task id but different attempt number, which 
means there are multiple
-   *  tasks with the same task id running at the same 
time. Implementations can
-   *  use this attempt number to distinguish writers 
of different task attempts.
+   * @param taskId A unique identifier for a task that is performing the 
write of the partition
+   *   data. Spark may run multiple tasks for the same 
partition (due to speculation
+   *   or task failures, for example).
* @param epochId A monotonically increasing id for streaming queries 
that are split in to
*discrete periods of execution. For non-streaming 
queries,
*this ID will always be 0.
*/
-  DataWriter createDataWriter(int partitionId, int attemptNumber, long 
epochId);
+  DataWriter createDataWriter(int partitionId, int taskId, long 
epochId);
--- End diff --

+1 for the type change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...

2018-06-22 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21606#discussion_r197541490
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -125,11 +124,11 @@ object DataWritingSparkTask extends Logging {
 val coordinator = SparkEnv.get.outputCommitCoordinator
 val commitAuthorized = coordinator.canCommit(stageId, 
stageAttempt, partId, attemptId)
 if (commitAuthorized) {
-  logInfo(s"Writer for stage $stageId / $stageAttempt, " +
+  logInfo(s"Writer for stage $stageId.$stageAttempt, " +
 s"task $partId.$attemptId is authorized to commit.")
   dataWriter.commit()
 } else {
-  val message = s"Stage $stageId / $stageAttempt, " +
+  val message = s"Stage $stageId.$stageAttempt, " +
--- End diff --

Should these logs use TID instead of attempt number? The format used in 
other log messages is `s"Task $taskId (TID $tid)"`, I think.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21606: [SPARK-24552][core][SQL] Use task ID instead of a...

2018-06-22 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21606#discussion_r197540970
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala ---
@@ -76,13 +76,17 @@ object SparkHadoopWriter extends Logging {
 // Try to write all RDD partitions as a Hadoop OutputFormat.
 try {
   val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: 
Iterator[(K, V)]) => {
+// SPARK-24552: Generate a unique "attempt ID" based on the stage 
and task atempt numbers.
+// Assumes that there won't be more than Short.MaxValue attempts, 
at least not concurrently.
+val attemptId = (context.stageAttemptNumber << 16) | 
context.attemptNumber
--- End diff --

I don't think we should generate an ID this way. We already have a unique 
ID that is exposed in the Spark UI. I'd much rather make it clear that the TID 
passed to committers as an attempt ID is the same as the TID in the stage view. 
That makes debugging easier. Going with this approach just introduces yet 
another number to track an attempt.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21558: [SPARK-24552][SQL] Use task ID instead of attempt number...

2018-06-22 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21558
  
@vanzin, thanks for working on this. I was out most of this week at a 
conference and I'm still on just half time, which is why I was delayed. Sorry 
to leave you all waiting. I'll make comments on your PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21558: [SPARK-24552][SQL] Use task ID instead of attempt number...

2018-06-18 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21558
  
Yes, I just checked and speculative attempts do get a different TID. Just 
turn on speculation, run a large stage, and sort tasks in a stage by TID. There 
aren't duplicates.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21558: [SPARK-24552][SQL] Use task ID instead of attempt number...

2018-06-18 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21558
  
Retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21574: [SPARK-24478][SQL][followup] Move projection and filter ...

2018-06-18 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21574
  
+1 (non-binding) assuming that tests pass.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21574: [SPARK-24478][SQL][followup] Move projection and ...

2018-06-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21574#discussion_r196223209
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -106,7 +106,7 @@ case class StreamingDataSourceV2Relation(
 
 object DataSourceV2Relation {
   private implicit class SourceHelpers(source: DataSourceV2) {
-def asReadSupport: ReadSupport = {
+private def asReadSupport: ReadSupport = {
--- End diff --

Minor: these are effectively private because `SourceHelpers` is private. If 
we were to move `SourceHelpers` or make it public to some other part of Spark, 
we would have to revert these changes. So I think it is best to rely on the 
visibility of `SourceHelpers` instead of making these private.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21574: [SPARK-24478][SQL][followup] Move projection and ...

2018-06-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21574#discussion_r196222500
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -23,17 +23,24 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
-import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsReportStatistics}
 import org.apache.spark.sql.types.StructType
 
+/**
+ * A logical plan representing a data source v2 scan.
+ *
+ * @param source An instance of a [[DataSourceV2]] implementation.
+ * @param options The options for this scan. Used to create fresh 
[[DataSourceReader]].
+ * @param userSpecifiedSchema The user-specified schema for this scan. 
Used to create fresh
+ *[[DataSourceReader]].
+ */
 case class DataSourceV2Relation(
 source: DataSourceV2,
 output: Seq[AttributeReference],
 options: Map[String, String],
-userSpecifiedSchema: Option[StructType] = None)
+userSpecifiedSchema: Option[StructType])
--- End diff --

Either way, it's up to you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21558: [SPARK-24552][SQL] Use task ID instead of attempt number...

2018-06-18 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21558
  
@vanzin, the ID that this uses is the TID, which I thought was always 
unique. It appears to be a one-up counter. Also, I noted on your PR that both 
are needed because even if we only commit one of the attempts, the writers may 
use this ID to track and clean up data.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21577: [SPARK-24552][core] Correctly identify tasks in o...

2018-06-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21577#discussion_r196217742
  
--- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---
@@ -399,7 +399,8 @@ private[spark] object JsonProtocol {
 ("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
 ("Accumulator Updates" -> accumUpdates)
   case taskCommitDenied: TaskCommitDenied =>
-("Job ID" -> taskCommitDenied.jobID) ~
+("Job ID" -> taskCommitDenied.stageID) ~
+("Job Attempt Number" -> taskCommitDenied.stageAttempt) ~
--- End diff --

Also, will this affect the compatibility of the history server files?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21577: [SPARK-24552][core] Correctly identify tasks in output c...

2018-06-18 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21577
  
+1. This fixes the commit coordinator problem where two separate tasks can 
be authorized. That case could lead to duplicate data (if, for example, both 
tasks generated unique file names using a random UUID).

However, this doesn't address the problem I hit in practice, where a file 
was created twice and deleted once because the same task attempt number was 
both allowed to commit by the coordinator and denied commit by the coordinator 
(after the stage had finished).

We still need the solution proposed in 
https://github.com/apache/spark/pull/21558 for the v2 API. But that's more of a 
v2 API problem because that API makes the guarantee that implementations can 
rely on the attempt ID.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21577: [SPARK-24552][core] Correctly identify tasks in o...

2018-06-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21577#discussion_r196215961
  
--- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---
@@ -399,7 +399,8 @@ private[spark] object JsonProtocol {
 ("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
 ("Accumulator Updates" -> accumUpdates)
   case taskCommitDenied: TaskCommitDenied =>
-("Job ID" -> taskCommitDenied.jobID) ~
+("Job ID" -> taskCommitDenied.stageID) ~
+("Job Attempt Number" -> taskCommitDenied.stageAttempt) ~
--- End diff --

Why does this use "Job" and not "Stage"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21577: [SPARK-24552][core] Correctly identify tasks in o...

2018-06-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21577#discussion_r196214944
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -131,16 +139,17 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
 reason match {
   case Success =>
   // The task output has been committed successfully
-  case denied: TaskCommitDenied =>
-logInfo(s"Task was denied committing, stage: $stage, partition: 
$partition, " +
-  s"attempt: $attemptNumber")
-  case otherReason =>
+  case _: TaskCommitDenied =>
+logInfo(s"Task was denied committing, stage: $stage / 
$stageAttempt, " +
--- End diff --

Nit: Should this be `s"$stage.$stageAttempt"`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21577: [SPARK-24552][core] Correctly identify tasks in o...

2018-06-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21577#discussion_r196214788
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -131,16 +139,17 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
 reason match {
   case Success =>
   // The task output has been committed successfully
-  case denied: TaskCommitDenied =>
-logInfo(s"Task was denied committing, stage: $stage, partition: 
$partition, " +
-  s"attempt: $attemptNumber")
-  case otherReason =>
+  case _: TaskCommitDenied =>
+logInfo(s"Task was denied committing, stage: $stage / 
$stageAttempt, " +
+  s"partition: $partition, attempt: $attemptNumber")
+  case _ =>
 // Mark the attempt as failed to blacklist from future commit 
protocol
-stageState.failures.getOrElseUpdate(partition, mutable.Set()) += 
attemptNumber
-if (stageState.authorizedCommitters(partition) == attemptNumber) {
+val taskId = TaskIdentifier(stageAttempt, attemptNumber)
+stageState.failures.getOrElseUpdate(partition, mutable.Set()) += 
taskId
+if (stageState.authorizedCommitters(partition) == taskId) {
   logDebug(s"Authorized committer (attemptNumber=$attemptNumber, 
stage=$stage, " +
 s"partition=$partition) failed; clearing lock")
-  stageState.authorizedCommitters(partition) = 
NO_AUTHORIZED_COMMITTER
+  stageState.authorizedCommitters(partition) = null
--- End diff --

Nit: why not use Option[TaskIdentifier] and None here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21558: [SPARK-24552][SQL] Use task ID instead of attempt number...

2018-06-18 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21558
  
I think the right thing to do for this commit is to use the task ID instead 
of the stage-local attempt number. I've updated the PR with the change so I 
think this is ready to commit. @vanzin, are you okay committing this?

cc @cloud-fan


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21558: [SPARK-24552][SQL] Use task ID instead of attempt number...

2018-06-18 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21558
  
@tgravescs, that's exactly what we're seeing. I think it might just be 
misleading to have a stage-local attempt ID although it is more friendly for 
users and matches what MR does.

@jiangxb1987, we see SPARK-24492 occasionally (it has gotten better with 
later fixes to the coordinator) and haven't tracked down the cause yet. If this 
were the underlying cause that would be great, but I'm not sure how it could be 
the cause. If the same attempt number is reused, then two tasks in different 
stage attempts may both be authorized to commit. That wouldn't cause the 
retries because it wouldn't cause extra commit denials.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21574: [SPARK-24478][SQL][followup] Move projection and ...

2018-06-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21574#discussion_r196192774
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -23,17 +23,24 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
-import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsReportStatistics}
 import org.apache.spark.sql.types.StructType
 
+/**
+ * A logical plan representing a data source v2 scan.
+ *
+ * @param source An instance of a [[DataSourceV2]] implementation.
+ * @param options The options for this scan. Used to create fresh 
[[DataSourceReader]].
+ * @param userSpecifiedSchema The user-specified schema for this scan. 
Used to create fresh
+ *[[DataSourceReader]].
+ */
 case class DataSourceV2Relation(
 source: DataSourceV2,
 output: Seq[AttributeReference],
 options: Map[String, String],
-userSpecifiedSchema: Option[StructType] = None)
+userSpecifiedSchema: Option[StructType])
--- End diff --

That's because there are few places that create v2 relations so far, but 
when SQL statements and other paths that don't allow you to supply your own 
schema are added, I think this will be more common. It's okay to remove it, but 
I don't see much value in the change and I like to keep non-functional changes 
to a minimum.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21503: [SPARK-24478][SQL] Move projection and filter push down ...

2018-06-18 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21503
  
Thank you for reviewing this, @cloud-fan!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21574: [SPARK-24478][SQL][followup] Move projection and ...

2018-06-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21574#discussion_r196173414
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -23,17 +23,24 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
-import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsReportStatistics}
 import org.apache.spark.sql.types.StructType
 
+/**
+ * A logical plan representing a data source v2 scan.
+ *
+ * @param source An instance of a [[DataSourceV2]] implementation.
+ * @param options The options for this scan. Used to create fresh 
[[DataSourceReader]].
+ * @param userSpecifiedSchema The user-specified schema for this scan. 
Used to create fresh
+ *[[DataSourceReader]].
+ */
 case class DataSourceV2Relation(
 source: DataSourceV2,
 output: Seq[AttributeReference],
 options: Map[String, String],
-userSpecifiedSchema: Option[StructType] = None)
+userSpecifiedSchema: Option[StructType])
--- End diff --

Why is this change necessary?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >