[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-09 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r201190701
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -19,187 +19,200 @@ package 
org.apache.spark.sql.execution.datasources.parquet
 
 import java.sql.Date
 
+import scala.collection.JavaConverters.asScalaBufferConverter
+
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema.{DecimalMetadata, MessageType, 
OriginalType, PrimitiveComparator, PrimitiveType}
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
 private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
 
+  private case class ParquetSchemaType(
+  originalType: OriginalType,
+  primitiveTypeName: PrimitiveTypeName,
+  decimalMetadata: DecimalMetadata)
+
+  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
+  private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
+  private val ParquetLongType = ParquetSchemaType(null, INT64, null)
+  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
+  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
+  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
+  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
+  private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
+
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
-  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+case ParquetBooleanType =>
   (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+case ParquetIntegerType =>
   (n: String, v: Any) => FilterApi.eq(intColumn(n), 
v.asInstanceOf[Integer])
-case LongType =>
+case ParquetLongType =>
   (n: String, v: Any) => FilterApi.eq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-case FloatType =>
+case ParquetFloatType =>
   (n: String, v: Any) => FilterApi.eq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-case DoubleType =>
+case ParquetDoubleType =>
   (n: String, v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
 // Binary.fromString and Binary.fromByteArray don't accept null values
-case StringType =>
+case ParquetStringType =>
   (n: String, v: Any) => FilterApi.eq(
 binaryColumn(n),
 Option(v).map(s => 
Binary.fromString(s.asInstanceOf[String])).orNull)
-case BinaryType =>
+case ParquetBinaryType =>
   (n: String, v: Any) => FilterApi.eq(
 binaryColumn(n),
 Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-case DateType if pushDownDate =>
+case ParquetDateType if pushDownDate =>
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
   }
 
-  private val makeNotEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) 
=> FilterPredicate] = {
+case ParquetBooleanType =>
   (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+case ParquetIntegerType =>
   (n: String, v: Any) => FilterApi.notEq(intColumn(n), 
v.asInstanceOf[Integer])
-case LongType =>
+case ParquetLongType =>
   (n: String, v: Any) => FilterApi.notEq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-case FloatType =>
+case ParquetFloatType =>
   (n: String, v: Any) => 

[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r201069421
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -19,187 +19,200 @@ package 
org.apache.spark.sql.execution.datasources.parquet
 
 import java.sql.Date
 
+import scala.collection.JavaConverters.asScalaBufferConverter
+
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema.{DecimalMetadata, MessageType, 
OriginalType, PrimitiveComparator, PrimitiveType}
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
 private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
 
+  private case class ParquetSchemaType(
+  originalType: OriginalType,
+  primitiveTypeName: PrimitiveTypeName,
+  decimalMetadata: DecimalMetadata)
+
+  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
+  private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
+  private val ParquetLongType = ParquetSchemaType(null, INT64, null)
+  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
+  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
+  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
+  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
+  private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
+
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
-  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+case ParquetBooleanType =>
   (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+case ParquetIntegerType =>
   (n: String, v: Any) => FilterApi.eq(intColumn(n), 
v.asInstanceOf[Integer])
-case LongType =>
+case ParquetLongType =>
   (n: String, v: Any) => FilterApi.eq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-case FloatType =>
+case ParquetFloatType =>
   (n: String, v: Any) => FilterApi.eq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-case DoubleType =>
+case ParquetDoubleType =>
   (n: String, v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
 // Binary.fromString and Binary.fromByteArray don't accept null values
-case StringType =>
+case ParquetStringType =>
   (n: String, v: Any) => FilterApi.eq(
 binaryColumn(n),
 Option(v).map(s => 
Binary.fromString(s.asInstanceOf[String])).orNull)
-case BinaryType =>
+case ParquetBinaryType =>
   (n: String, v: Any) => FilterApi.eq(
 binaryColumn(n),
 Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-case DateType if pushDownDate =>
+case ParquetDateType if pushDownDate =>
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
   }
 
-  private val makeNotEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) 
=> FilterPredicate] = {
+case ParquetBooleanType =>
   (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+case ParquetIntegerType =>
   (n: String, v: Any) => FilterApi.notEq(intColumn(n), 
v.asInstanceOf[Integer])
-case LongType =>
+case ParquetLongType =>
   (n: String, v: Any) => FilterApi.notEq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-case FloatType =>
+case ParquetFloatType =>
   (n: String, v: Any) => 

[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21696


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-03 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r200011264
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -19,187 +19,200 @@ package 
org.apache.spark.sql.execution.datasources.parquet
 
 import java.sql.Date
 
+import scala.collection.JavaConverters.asScalaBufferConverter
+
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema.{DecimalMetadata, MessageType, 
OriginalType, PrimitiveComparator, PrimitiveType}
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
 private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
 
+  private case class ParquetSchemaType(
+  originalType: OriginalType,
+  primitiveTypeName: PrimitiveTypeName,
+  decimalMetadata: DecimalMetadata)
+
+  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
+  private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
+  private val ParquetLongType = ParquetSchemaType(null, INT64, null)
+  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
+  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
+  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
+  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
+  private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
+
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
-  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+case ParquetBooleanType =>
   (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+case ParquetIntegerType =>
   (n: String, v: Any) => FilterApi.eq(intColumn(n), 
v.asInstanceOf[Integer])
-case LongType =>
+case ParquetLongType =>
   (n: String, v: Any) => FilterApi.eq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-case FloatType =>
+case ParquetFloatType =>
   (n: String, v: Any) => FilterApi.eq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-case DoubleType =>
+case ParquetDoubleType =>
   (n: String, v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
 // Binary.fromString and Binary.fromByteArray don't accept null values
-case StringType =>
+case ParquetStringType =>
   (n: String, v: Any) => FilterApi.eq(
 binaryColumn(n),
 Option(v).map(s => 
Binary.fromString(s.asInstanceOf[String])).orNull)
-case BinaryType =>
+case ParquetBinaryType =>
   (n: String, v: Any) => FilterApi.eq(
 binaryColumn(n),
 Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-case DateType if pushDownDate =>
+case ParquetDateType if pushDownDate =>
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
   }
 
-  private val makeNotEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) 
=> FilterPredicate] = {
+case ParquetBooleanType =>
   (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+case ParquetIntegerType =>
   (n: String, v: Any) => FilterApi.notEq(intColumn(n), 
v.asInstanceOf[Integer])
-case LongType =>
+case ParquetLongType =>
   (n: String, v: Any) => FilterApi.notEq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-case FloatType =>
+case ParquetFloatType =>
   (n: String, v: Any) => 

[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-03 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r19002
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -19,166 +19,186 @@ package 
org.apache.spark.sql.execution.datasources.parquet
 
 import java.sql.Date
 
+import scala.collection.JavaConverters._
+
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
 private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
 
+  case class ParquetSchemaType(
+  originalType: OriginalType,
+  primitiveTypeName: PrimitiveType.PrimitiveTypeName,
+  decimalMetadata: DecimalMetadata)
+
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
-  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+// BooleanType
+case ParquetSchemaType(null, BOOLEAN, null) =>
   (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+// IntegerType
+case ParquetSchemaType(null, INT32, null) =>
   (n: String, v: Any) => FilterApi.eq(intColumn(n), 
v.asInstanceOf[Integer])
-case LongType =>
+// LongType
+case ParquetSchemaType(null, INT64, null) =>
   (n: String, v: Any) => FilterApi.eq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-case FloatType =>
+// FloatType
+case ParquetSchemaType(null, FLOAT, null) =>
   (n: String, v: Any) => FilterApi.eq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-case DoubleType =>
+// DoubleType
+case ParquetSchemaType(null, DOUBLE, null) =>
   (n: String, v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
+// StringType
 // Binary.fromString and Binary.fromByteArray don't accept null values
-case StringType =>
+case ParquetSchemaType(UTF8, BINARY, null) =>
   (n: String, v: Any) => FilterApi.eq(
 binaryColumn(n),
 Option(v).map(s => 
Binary.fromString(s.asInstanceOf[String])).orNull)
-case BinaryType =>
+// BinaryType
+case ParquetSchemaType(null, BINARY, null) =>
   (n: String, v: Any) => FilterApi.eq(
 binaryColumn(n),
 Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-case DateType if pushDownDate =>
+// DateType
+case ParquetSchemaType(DATE, INT32, null) if pushDownDate =>
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
   }
 
-  private val makeNotEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) 
=> FilterPredicate] = {
+case ParquetSchemaType(null, BOOLEAN, null) =>
   (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+case ParquetSchemaType(null, INT32, null) =>
   (n: String, v: Any) => FilterApi.notEq(intColumn(n), 
v.asInstanceOf[Integer])
-case LongType =>
+case ParquetSchemaType(null, INT64, null) =>
   (n: String, v: Any) => FilterApi.notEq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-case FloatType =>
+case ParquetSchemaType(null, FLOAT, null) =>
   (n: String, v: Any) => FilterApi.notEq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-case DoubleType =>
+case ParquetSchemaType(null, DOUBLE, null) =>
   (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
-case StringType =>
+case ParquetSchemaType(UTF8, BINARY, null) =>
   (n: String, v: Any) => FilterApi.notEq(

[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r11214
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -19,166 +19,186 @@ package 
org.apache.spark.sql.execution.datasources.parquet
 
 import java.sql.Date
 
+import scala.collection.JavaConverters._
+
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
 private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
 
+  case class ParquetSchemaType(
+  originalType: OriginalType,
+  primitiveTypeName: PrimitiveType.PrimitiveTypeName,
+  decimalMetadata: DecimalMetadata)
+
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
-  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+// BooleanType
+case ParquetSchemaType(null, BOOLEAN, null) =>
   (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+// IntegerType
+case ParquetSchemaType(null, INT32, null) =>
   (n: String, v: Any) => FilterApi.eq(intColumn(n), 
v.asInstanceOf[Integer])
-case LongType =>
+// LongType
+case ParquetSchemaType(null, INT64, null) =>
   (n: String, v: Any) => FilterApi.eq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-case FloatType =>
+// FloatType
+case ParquetSchemaType(null, FLOAT, null) =>
   (n: String, v: Any) => FilterApi.eq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-case DoubleType =>
+// DoubleType
+case ParquetSchemaType(null, DOUBLE, null) =>
   (n: String, v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
+// StringType
 // Binary.fromString and Binary.fromByteArray don't accept null values
-case StringType =>
+case ParquetSchemaType(UTF8, BINARY, null) =>
   (n: String, v: Any) => FilterApi.eq(
 binaryColumn(n),
 Option(v).map(s => 
Binary.fromString(s.asInstanceOf[String])).orNull)
-case BinaryType =>
+// BinaryType
+case ParquetSchemaType(null, BINARY, null) =>
   (n: String, v: Any) => FilterApi.eq(
 binaryColumn(n),
 Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-case DateType if pushDownDate =>
+// DateType
+case ParquetSchemaType(DATE, INT32, null) if pushDownDate =>
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
   }
 
-  private val makeNotEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) 
=> FilterPredicate] = {
+case ParquetSchemaType(null, BOOLEAN, null) =>
   (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+case ParquetSchemaType(null, INT32, null) =>
   (n: String, v: Any) => FilterApi.notEq(intColumn(n), 
v.asInstanceOf[Integer])
-case LongType =>
+case ParquetSchemaType(null, INT64, null) =>
   (n: String, v: Any) => FilterApi.notEq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-case FloatType =>
+case ParquetSchemaType(null, FLOAT, null) =>
   (n: String, v: Any) => FilterApi.notEq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-case DoubleType =>
+case ParquetSchemaType(null, DOUBLE, null) =>
   (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
-case StringType =>
+case ParquetSchemaType(UTF8, BINARY, null) =>
   (n: String, v: Any) => 

[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r11109
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -19,166 +19,186 @@ package 
org.apache.spark.sql.execution.datasources.parquet
 
 import java.sql.Date
 
+import scala.collection.JavaConverters._
+
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
 private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
 
+  case class ParquetSchemaType(
+  originalType: OriginalType,
+  primitiveTypeName: PrimitiveType.PrimitiveTypeName,
+  decimalMetadata: DecimalMetadata)
+
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
-  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+// BooleanType
+case ParquetSchemaType(null, BOOLEAN, null) =>
   (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+// IntegerType
+case ParquetSchemaType(null, INT32, null) =>
--- End diff --

the safest way is to look at both file's type and Spark's type, and deal 
with type mismatch. We can do it later since it's an existing problem. 
Currently Spark tries its best to guarantee the type matches(except 
missing/extra columns). The only case I can think of that may break the 
assumption is: the parquet files have conflicting schema and Sparks read them 
using a user-specified schema(so that we can skip schema inference) that 
doesn't match all the parquet files.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r199980993
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -19,166 +19,186 @@ package 
org.apache.spark.sql.execution.datasources.parquet
 
 import java.sql.Date
 
+import scala.collection.JavaConverters._
+
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
 private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
 
+  case class ParquetSchemaType(
+  originalType: OriginalType,
+  primitiveTypeName: PrimitiveType.PrimitiveTypeName,
+  decimalMetadata: DecimalMetadata)
+
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
-  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+// BooleanType
+case ParquetSchemaType(null, BOOLEAN, null) =>
   (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+// IntegerType
+case ParquetSchemaType(null, INT32, null) =>
   (n: String, v: Any) => FilterApi.eq(intColumn(n), 
v.asInstanceOf[Integer])
-case LongType =>
+// LongType
+case ParquetSchemaType(null, INT64, null) =>
   (n: String, v: Any) => FilterApi.eq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-case FloatType =>
+// FloatType
+case ParquetSchemaType(null, FLOAT, null) =>
   (n: String, v: Any) => FilterApi.eq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-case DoubleType =>
+// DoubleType
+case ParquetSchemaType(null, DOUBLE, null) =>
   (n: String, v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
+// StringType
 // Binary.fromString and Binary.fromByteArray don't accept null values
-case StringType =>
+case ParquetSchemaType(UTF8, BINARY, null) =>
   (n: String, v: Any) => FilterApi.eq(
 binaryColumn(n),
 Option(v).map(s => 
Binary.fromString(s.asInstanceOf[String])).orNull)
-case BinaryType =>
+// BinaryType
+case ParquetSchemaType(null, BINARY, null) =>
   (n: String, v: Any) => FilterApi.eq(
 binaryColumn(n),
 Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
-case DateType if pushDownDate =>
+// DateType
+case ParquetSchemaType(DATE, INT32, null) if pushDownDate =>
   (n: String, v: Any) => FilterApi.eq(
 intColumn(n),
 Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
   }
 
-  private val makeNotEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) 
=> FilterPredicate] = {
+case ParquetSchemaType(null, BOOLEAN, null) =>
   (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+case ParquetSchemaType(null, INT32, null) =>
   (n: String, v: Any) => FilterApi.notEq(intColumn(n), 
v.asInstanceOf[Integer])
-case LongType =>
+case ParquetSchemaType(null, INT64, null) =>
   (n: String, v: Any) => FilterApi.notEq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
-case FloatType =>
+case ParquetSchemaType(null, FLOAT, null) =>
   (n: String, v: Any) => FilterApi.notEq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
-case DoubleType =>
+case ParquetSchemaType(null, DOUBLE, null) =>
   (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
 
-case StringType =>
+case ParquetSchemaType(UTF8, BINARY, null) =>
   (n: String, v: Any) => FilterApi.notEq(
 

[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r199980897
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -19,166 +19,186 @@ package 
org.apache.spark.sql.execution.datasources.parquet
 
 import java.sql.Date
 
+import scala.collection.JavaConverters._
+
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
 private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
 
+  case class ParquetSchemaType(
+  originalType: OriginalType,
+  primitiveTypeName: PrimitiveType.PrimitiveTypeName,
+  decimalMetadata: DecimalMetadata)
+
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
-  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+// BooleanType
--- End diff --

The other partial functions don't have these comments. Is that on purpose? 
Maybe these should be constants instead to make the code more readable and 
consistent?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r199980632
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -19,166 +19,186 @@ package 
org.apache.spark.sql.execution.datasources.parquet
 
 import java.sql.Date
 
+import scala.collection.JavaConverters._
+
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
 private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
 
+  case class ParquetSchemaType(
+  originalType: OriginalType,
+  primitiveTypeName: PrimitiveType.PrimitiveTypeName,
+  decimalMetadata: DecimalMetadata)
+
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
-  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+// BooleanType
+case ParquetSchemaType(null, BOOLEAN, null) =>
   (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
-case IntegerType =>
+// IntegerType
+case ParquetSchemaType(null, INT32, null) =>
--- End diff --

Before, it was a valid assumption that the value's type matched the 
`DataType`. Now that this is the file's type that might not be the case. For 
example, byte and short are stored as INT32. This should cast to Number and 
then convert to the file's type.

I would also do this for INT64 columns, in case the schema has evolved and 
a column that was INT32 is not INT64. The converters (used to materialize 
records) don't currently support this, but it would be reasonable for them to 
support it eventually.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r199979463
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -379,14 +366,29 @@ class ParquetFileFormat
   null)
 
   val sharedConf = broadcastedHadoopConf.value.value
+
+  val fileMetaData =
+ParquetFileReader.readFooter(sharedConf, fileSplit.getPath, 
SKIP_ROW_GROUPS).getFileMetaData
--- End diff --

We should *always* read the footer and adjust the filters to match. In our 
version we use the following to handle situations where the column names have 
changed case or where Hive has returned a different case:

```
  // Try to push down filters when filter push-down is enabled.
  val pushed = if (pushdownEnabled) {
// read the file schema to create Parquet filters that match case
val fileReader = ParquetFileReader.open(conf, fileSplit.getPath)
val fileSchema = try {
  new ParquetToSparkSchemaConverter(conf).convert(
ParquetReadSupport.clipParquetSchema(
  fileReader.getFileMetaData.getSchema, requiredSchema))
} finally {
  fileReader.close()
}

filters
// Collects all converted Parquet filter predicates. Notice 
that not all predicates can
// be converted (`ParquetFilters.createFilter` returns an 
`Option`). That's why a
// `flatMap` is used here.
.flatMap(ParquetFilters.createFilter(fileSchema, _))
.reduceOption(FilterApi.and)
  } else {
None
  }
```

That's pretty much the same thing as here. In addition, if columns can be 
renamed within a Parquet file then you need to push filters for the names used 
in the file's schema. I don't think that's a problem here because I don't know 
of a way to rename columns in a Spark Parquet table. (Iceberg uses field IDs to 
do this.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-02 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r199686314
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -379,14 +366,29 @@ class ParquetFileFormat
   null)
 
   val sharedConf = broadcastedHadoopConf.value.value
+
+  val fileMetaData =
+ParquetFileReader.readFooter(sharedConf, fileSplit.getPath, 
SKIP_ROW_GROUPS).getFileMetaData
--- End diff --

Yes, I think so. It should be avoided.`isCreatedByParquetMr` was 
intentionally a function to avoid it by short circuiting.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-02 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r199678338
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -379,14 +366,29 @@ class ParquetFileFormat
   null)
 
   val sharedConf = broadcastedHadoopConf.value.value
+
+  val fileMetaData =
+ParquetFileReader.readFooter(sharedConf, fileSplit.getPath, 
SKIP_ROW_GROUPS).getFileMetaData
--- End diff --

will we read footer again in the parquet reader?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-02 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21696#discussion_r199672805
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
@@ -19,166 +19,186 @@ package 
org.apache.spark.sql.execution.datasources.parquet
 
 import java.sql.Date
 
+import scala.collection.JavaConverters._
+
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
 private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
 
+  case class ParquetSchemaType(
+  originalType: OriginalType,
+  primitiveTypeName: PrimitiveType.PrimitiveTypeName,
+  decimalMetadata: DecimalMetadata)
+
   private def dateToDays(date: Date): SQLDate = {
 DateTimeUtils.fromJavaDate(date)
   }
 
-  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
-case BooleanType =>
+  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
+// BooleanType
+case ParquetSchemaType(null, BOOLEAN, null) =>
--- End diff --

Mapping type reference:

https://github.com/apache/spark/blob/21a7bfd5c324e6c82152229f1394f26afeae771c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala#L338-L560


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters

2018-07-02 Thread wangyum
GitHub user wangyum opened a pull request:

https://github.com/apache/spark/pull/21696

[SPARK-24716][SQL] Refactor ParquetFilters

## What changes were proposed in this pull request?

Replace DataFrame schema to Parquet file schema when create 
`ParquetFilters`. 
more details will add later.


## How was this patch tested?

unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangyum/spark SPARK-24716

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21696.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21696


commit 10d408fd3fe429d5529755b5abebac86b22b6d55
Author: Yuming Wang 
Date:   2018-07-02T09:36:46Z

Refactor ParquetFilters




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org