[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20387


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r169239000
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,80 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Seq[AttributeReference],
+filters: Option[Seq[Expression]] = None,
+userSpecifiedSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  import DataSourceV2Relation._
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=${source.name}, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+// use the projection attributes to avoid assigning new ids. fields 
that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = projection.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
+  }
+
+  private lazy val v2Options: DataSourceOptions = makeV2Options(options)
+
+  lazy val (
+  reader: DataSourceReader,
+  unsupportedFilters: Seq[Expression],
+  pushedFilters: Seq[Expression]) = {
+val newReader = userSpecifiedSchema match {
+  case Some(s) =>
+source.asReadSupportWithSchema.createReader(s, v2Options)
+  case _ =>
+source.asReadSupport.createReader(v2Options)
+}
+
+DataSourceV2Relation.pushRequiredColumns(newReader, 
projection.toStructType)
 
-  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2Relation]
+val (remainingFilters, pushedFilters) = filters match {
+  case Some(filterSeq) =>
+DataSourceV2Relation.pushFilters(newReader, filterSeq)
+  case _ =>
+(Nil, Nil)
+}
+
+(newReader, remainingFilters, pushedFilters)
+  }
+
+  override def doCanonicalize(): LogicalPlan = {
+val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation]
+
+// override output with canonicalized output to avoid attempting to 
configure a reader
+val canonicalOutput: Seq[AttributeReference] = this.output
+.map(a => QueryPlan.normalizeExprId(a, projection))
+
+new DataSourceV2Relation(c.source, c.options, c.projection) {
--- End diff --

This is hacky but I don't have a better idea now, let's revisit it later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-17 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r168933227
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
@@ -17,130 +17,55 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeMap, AttributeSet, Expression, NamedExpression, PredicateHelper}
-import org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources
-import org.apache.spark.sql.sources.v2.reader._
 
-/**
- * Pushes down various operators to the underlying data source for better 
performance. Operators are
- * being pushed down with a specific order. As an example, given a LIMIT 
has a FILTER child, you
- * can't push down LIMIT if FILTER is not completely pushed down. When 
both are pushed down, the
- * data source should execute FILTER before LIMIT. And required columns 
are calculated at the end,
- * because when more operators are pushed down, we may need less columns 
at Spark side.
- */
-object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with 
PredicateHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-// Note that, we need to collect the target operator along with 
PROJECT node, as PROJECT may
-// appear in many places for column pruning.
-// TODO: Ideally column pruning should be implemented via a plan 
property that is propagated
-// top-down, then we can simplify the logic here and only collect 
target operators.
-val filterPushed = plan transformUp {
-  case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, 
reader)) =>
-val (candidates, nonDeterministic) =
-  splitConjunctivePredicates(condition).partition(_.deterministic)
-
-val stayUpFilters: Seq[Expression] = reader match {
-  case r: SupportsPushDownCatalystFilters =>
-r.pushCatalystFilters(candidates.toArray)
-
-  case r: SupportsPushDownFilters =>
-// A map from original Catalyst expressions to corresponding 
translated data source
-// filters. If a predicate is not in this map, it means it 
cannot be pushed down.
-val translatedMap: Map[Expression, sources.Filter] = 
candidates.flatMap { p =>
-  DataSourceStrategy.translateFilter(p).map(f => p -> f)
-}.toMap
-
-// Catalyst predicate expressions that cannot be converted to 
data source filters.
-val nonConvertiblePredicates = 
candidates.filterNot(translatedMap.contains)
-
-// Data source filters that cannot be pushed down. An 
unhandled filter means
-// the data source cannot guarantee the rows returned can pass 
the filter.
-// As a result we must return it so Spark can plan an extra 
filter operator.
-val unhandledFilters = 
r.pushFilters(translatedMap.values.toArray).toSet
-val unhandledPredicates = translatedMap.filter { case (_, f) =>
-  unhandledFilters.contains(f)
-}.keys
-
-nonConvertiblePredicates ++ unhandledPredicates
-
-  case _ => candidates
-}
-
-val filterCondition = (stayUpFilters ++ 
nonDeterministic).reduceLeftOption(And)
-val withFilter = filterCondition.map(Filter(_, r)).getOrElse(r)
-if (withFilter.output == fields) {
-  withFilter
-} else {
-  Project(fields, withFilter)
-}
-}
-
-// TODO: add more push down rules.
-
-val columnPruned = pushDownRequiredColumns(filterPushed, 
filterPushed.outputSet)
-// After column pruning, we may have redundant PROJECT nodes in the 
query plan, remove them.
-RemoveRedundantProject(columnPruned)
-  }
-
-  // TODO: nested fields pruning
-  private def pushDownRequiredColumns(
-  plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = {
-plan match {
-  case p @ Project(projectList, child) =>
-val required = projectList.flatMap(_.references)
-p.copy(child = pushDownRequiredColumns(child, 
AttributeSet(required)))
-
-  case f @ Filter(condition, child) =>
-val required = requiredByParent

[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r168910531
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
@@ -17,130 +17,55 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeMap, AttributeSet, Expression, NamedExpression, PredicateHelper}
-import org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources
-import org.apache.spark.sql.sources.v2.reader._
 
-/**
- * Pushes down various operators to the underlying data source for better 
performance. Operators are
- * being pushed down with a specific order. As an example, given a LIMIT 
has a FILTER child, you
- * can't push down LIMIT if FILTER is not completely pushed down. When 
both are pushed down, the
- * data source should execute FILTER before LIMIT. And required columns 
are calculated at the end,
- * because when more operators are pushed down, we may need less columns 
at Spark side.
- */
-object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with 
PredicateHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-// Note that, we need to collect the target operator along with 
PROJECT node, as PROJECT may
-// appear in many places for column pruning.
-// TODO: Ideally column pruning should be implemented via a plan 
property that is propagated
-// top-down, then we can simplify the logic here and only collect 
target operators.
-val filterPushed = plan transformUp {
-  case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, 
reader)) =>
-val (candidates, nonDeterministic) =
-  splitConjunctivePredicates(condition).partition(_.deterministic)
-
-val stayUpFilters: Seq[Expression] = reader match {
-  case r: SupportsPushDownCatalystFilters =>
-r.pushCatalystFilters(candidates.toArray)
-
-  case r: SupportsPushDownFilters =>
-// A map from original Catalyst expressions to corresponding 
translated data source
-// filters. If a predicate is not in this map, it means it 
cannot be pushed down.
-val translatedMap: Map[Expression, sources.Filter] = 
candidates.flatMap { p =>
-  DataSourceStrategy.translateFilter(p).map(f => p -> f)
-}.toMap
-
-// Catalyst predicate expressions that cannot be converted to 
data source filters.
-val nonConvertiblePredicates = 
candidates.filterNot(translatedMap.contains)
-
-// Data source filters that cannot be pushed down. An 
unhandled filter means
-// the data source cannot guarantee the rows returned can pass 
the filter.
-// As a result we must return it so Spark can plan an extra 
filter operator.
-val unhandledFilters = 
r.pushFilters(translatedMap.values.toArray).toSet
-val unhandledPredicates = translatedMap.filter { case (_, f) =>
-  unhandledFilters.contains(f)
-}.keys
-
-nonConvertiblePredicates ++ unhandledPredicates
-
-  case _ => candidates
-}
-
-val filterCondition = (stayUpFilters ++ 
nonDeterministic).reduceLeftOption(And)
-val withFilter = filterCondition.map(Filter(_, r)).getOrElse(r)
-if (withFilter.output == fields) {
-  withFilter
-} else {
-  Project(fields, withFilter)
-}
-}
-
-// TODO: add more push down rules.
-
-val columnPruned = pushDownRequiredColumns(filterPushed, 
filterPushed.outputSet)
-// After column pruning, we may have redundant PROJECT nodes in the 
query plan, remove them.
-RemoveRedundantProject(columnPruned)
-  }
-
-  // TODO: nested fields pruning
-  private def pushDownRequiredColumns(
-  plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = {
-plan match {
-  case p @ Project(projectList, child) =>
-val required = projectList.flatMap(_.references)
-p.copy(child = pushDownRequiredColumns(child, 
AttributeSet(required)))
-
-  case f @ Filter(condition, child) =>
-val required = requiredByPar

[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r168910415
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -37,22 +100,147 @@ case class DataSourceV2Relation(
   }
 
   override def newInstance(): DataSourceV2Relation = {
-copy(output = output.map(_.newInstance()))
+// projection is used to maintain id assignment.
+// if projection is not set, use output so the copy is not equal to 
the original
+copy(projection = projection.map(_.newInstance()))
   }
 }
 
 /**
  * A specialization of DataSourceV2Relation with the streaming bit set to 
true. Otherwise identical
  * to the non-streaming relation.
  */
-class StreamingDataSourceV2Relation(
+case class StreamingDataSourceV2Relation(
 output: Seq[AttributeReference],
-reader: DataSourceReader) extends DataSourceV2Relation(output, reader) 
{
+reader: DataSourceReader)
+extends LeafNode with DataSourceReaderHolder with 
MultiInstanceRelation {
   override def isStreaming: Boolean = true
+
+  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[StreamingDataSourceV2Relation]
+
+  override def newInstance(): LogicalPlan = copy(output = 
output.map(_.newInstance()))
+
+  override def computeStats(): Statistics = reader match {
+case r: SupportsReportStatistics =>
+  Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
+case _ =>
+  Statistics(sizeInBytes = conf.defaultSizeInBytes)
+  }
 }
 
 object DataSourceV2Relation {
-  def apply(reader: DataSourceReader): DataSourceV2Relation = {
-new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
+  private implicit class SourceHelpers(source: DataSourceV2) {
+def asReadSupport: ReadSupport = {
+  source match {
+case support: ReadSupport =>
+  support
+case _: ReadSupportWithSchema =>
+  // this method is only called if there is no user-supplied 
schema. if there is no
+  // user-supplied schema and ReadSupport was not implemented, 
throw a helpful exception.
+  throw new AnalysisException(s"Data source requires a 
user-supplied schema: $name")
+case _ =>
+  throw new AnalysisException(s"Data source is not readable: 
$name")
+  }
+}
+
+def asReadSupportWithSchema: ReadSupportWithSchema = {
+  source match {
+case support: ReadSupportWithSchema =>
+  support
+case _: ReadSupport =>
+  throw new AnalysisException(
+s"Data source does not support user-supplied schema: $name")
--- End diff --

here we should compare the schema before throwing an exception.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167754111
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -37,22 +100,129 @@ case class DataSourceV2Relation(
   }
 
   override def newInstance(): DataSourceV2Relation = {
-copy(output = output.map(_.newInstance()))
+// projection is used to maintain id assignment.
+// if projection is not set, use output so the copy is not equal to 
the original
+copy(projection = projection.map(_.newInstance()))
   }
 }
 
 /**
  * A specialization of DataSourceV2Relation with the streaming bit set to 
true. Otherwise identical
  * to the non-streaming relation.
  */
-class StreamingDataSourceV2Relation(
+case class StreamingDataSourceV2Relation(
 output: Seq[AttributeReference],
-reader: DataSourceReader) extends DataSourceV2Relation(output, reader) 
{
+reader: DataSourceReader)
+extends LeafNode with DataSourceReaderHolder with 
MultiInstanceRelation {
   override def isStreaming: Boolean = true
+
+  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[StreamingDataSourceV2Relation]
+
+  override def newInstance(): LogicalPlan = copy(output = 
output.map(_.newInstance()))
 }
 
 object DataSourceV2Relation {
-  def apply(reader: DataSourceReader): DataSourceV2Relation = {
-new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
+  private implicit class SourceHelpers(source: DataSourceV2) {
+def asReadSupport: ReadSupport = {
+  source match {
+case support: ReadSupport =>
+  support
+case _: ReadSupportWithSchema =>
+  // this method is only called if there is no user-supplied 
schema. if there is no
+  // user-supplied schema and ReadSupport was not implemented, 
throw a helpful exception.
+  throw new AnalysisException(s"Data source requires a 
user-supplied schema: $name")
+case _ =>
+  throw new AnalysisException(s"Data source is not readable: 
$name")
+  }
+}
+
+def asReadSupportWithSchema: ReadSupportWithSchema = {
+  source match {
+case support: ReadSupportWithSchema =>
+  support
+case _: ReadSupport =>
--- End diff --

There was a historical reason we do this: 
https://github.com/apache/spark/pull/15046

I agree it's more clear to not allow this since data source v2 is brand 
new. But this change worths a JIRA ticket and an individual PR, do you mind to 
create one? Or I can do that for you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167753210
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,80 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Seq[AttributeReference],
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
--- End diff --

because we call it `userSpecifiedSchema` in `DataFrameReader` and 
`DataSource`,  I think it's more clear to make the name consistent.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167639047
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,80 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Seq[AttributeReference],
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
--- End diff --

I think this is clear. In what way do you think this will be misinterpreted?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167638689
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -37,22 +100,129 @@ case class DataSourceV2Relation(
   }
 
   override def newInstance(): DataSourceV2Relation = {
-copy(output = output.map(_.newInstance()))
+// projection is used to maintain id assignment.
+// if projection is not set, use output so the copy is not equal to 
the original
+copy(projection = projection.map(_.newInstance()))
   }
 }
 
 /**
  * A specialization of DataSourceV2Relation with the streaming bit set to 
true. Otherwise identical
  * to the non-streaming relation.
  */
-class StreamingDataSourceV2Relation(
+case class StreamingDataSourceV2Relation(
 output: Seq[AttributeReference],
-reader: DataSourceReader) extends DataSourceV2Relation(output, reader) 
{
+reader: DataSourceReader)
+extends LeafNode with DataSourceReaderHolder with 
MultiInstanceRelation {
   override def isStreaming: Boolean = true
+
+  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[StreamingDataSourceV2Relation]
+
+  override def newInstance(): LogicalPlan = copy(output = 
output.map(_.newInstance()))
 }
 
 object DataSourceV2Relation {
-  def apply(reader: DataSourceReader): DataSourceV2Relation = {
-new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
+  private implicit class SourceHelpers(source: DataSourceV2) {
+def asReadSupport: ReadSupport = {
+  source match {
+case support: ReadSupport =>
+  support
+case _: ReadSupportWithSchema =>
+  // this method is only called if there is no user-supplied 
schema. if there is no
+  // user-supplied schema and ReadSupport was not implemented, 
throw a helpful exception.
+  throw new AnalysisException(s"Data source requires a 
user-supplied schema: $name")
+case _ =>
+  throw new AnalysisException(s"Data source is not readable: 
$name")
+  }
+}
+
+def asReadSupportWithSchema: ReadSupportWithSchema = {
+  source match {
+case support: ReadSupportWithSchema =>
+  support
+case _: ReadSupport =>
--- End diff --

For your second concern about checking ASAP: this will be done when the 
relation is first created because projection is required and is always based on 
the schema returned by a reader. To be more clear about when this should 
happen, I think that the requirement is for this to happen during job planning 
and, ideally, before filter push-down.

For the case where the user supplies a schema that is identical to the 
source's schema: I think this might be a bad idea because it will cause 
confusion when source schemas change. Plus, I can't think of a situation where 
it is a good idea to pass a schema that is ignored.

Here's an example of how this will be confusing: think of a job that 
supplies a schema identical to the table's schema and runs fine, so it goes 
into production. What happens when the table's schema changes? If someone adds 
a column to the table, then the job will start failing and report that the 
source doesn't support user-supplied schemas, even though it had previously 
worked just fine with a user-supplied schema. In addition, the change to the 
table is actually compatible with the old job because the new column will be 
removed by a projection.

To fix this situation, it may be tempting to use the user-supplied schema 
as an initial projection. But that doesn't make sense because we don't need two 
projection mechanisms. If we used this as a second way to project, it would be 
confusing that you can't actually leave out columns (at least for CSV) and it 
would be odd that using this path you can coerce types, which should usually be 
done by Spark.

I think it is best not to allow a user-supplied schema when it isn't 
supported by a source.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167385858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -37,22 +100,129 @@ case class DataSourceV2Relation(
   }
 
   override def newInstance(): DataSourceV2Relation = {
-copy(output = output.map(_.newInstance()))
+// projection is used to maintain id assignment.
+// if projection is not set, use output so the copy is not equal to 
the original
+copy(projection = projection.map(_.newInstance()))
   }
 }
 
 /**
  * A specialization of DataSourceV2Relation with the streaming bit set to 
true. Otherwise identical
  * to the non-streaming relation.
  */
-class StreamingDataSourceV2Relation(
+case class StreamingDataSourceV2Relation(
 output: Seq[AttributeReference],
-reader: DataSourceReader) extends DataSourceV2Relation(output, reader) 
{
+reader: DataSourceReader)
+extends LeafNode with DataSourceReaderHolder with 
MultiInstanceRelation {
   override def isStreaming: Boolean = true
+
+  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[StreamingDataSourceV2Relation]
+
+  override def newInstance(): LogicalPlan = copy(output = 
output.map(_.newInstance()))
 }
 
 object DataSourceV2Relation {
-  def apply(reader: DataSourceReader): DataSourceV2Relation = {
-new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
+  private implicit class SourceHelpers(source: DataSourceV2) {
+def asReadSupport: ReadSupport = {
+  source match {
+case support: ReadSupport =>
+  support
+case _: ReadSupportWithSchema =>
+  // this method is only called if there is no user-supplied 
schema. if there is no
+  // user-supplied schema and ReadSupport was not implemented, 
throw a helpful exception.
+  throw new AnalysisException(s"Data source requires a 
user-supplied schema: $name")
+case _ =>
+  throw new AnalysisException(s"Data source is not readable: 
$name")
+  }
+}
+
+def asReadSupportWithSchema: ReadSupportWithSchema = {
+  source match {
+case support: ReadSupportWithSchema =>
+  support
+case _: ReadSupport =>
--- End diff --

another concern is: this check should be done ASAP so that we can fail 
earlier.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167385829
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -37,22 +100,129 @@ case class DataSourceV2Relation(
   }
 
   override def newInstance(): DataSourceV2Relation = {
-copy(output = output.map(_.newInstance()))
+// projection is used to maintain id assignment.
+// if projection is not set, use output so the copy is not equal to 
the original
+copy(projection = projection.map(_.newInstance()))
   }
 }
 
 /**
  * A specialization of DataSourceV2Relation with the streaming bit set to 
true. Otherwise identical
  * to the non-streaming relation.
  */
-class StreamingDataSourceV2Relation(
+case class StreamingDataSourceV2Relation(
 output: Seq[AttributeReference],
-reader: DataSourceReader) extends DataSourceV2Relation(output, reader) 
{
+reader: DataSourceReader)
+extends LeafNode with DataSourceReaderHolder with 
MultiInstanceRelation {
   override def isStreaming: Boolean = true
+
+  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[StreamingDataSourceV2Relation]
+
+  override def newInstance(): LogicalPlan = copy(output = 
output.map(_.newInstance()))
 }
 
 object DataSourceV2Relation {
-  def apply(reader: DataSourceReader): DataSourceV2Relation = {
-new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
+  private implicit class SourceHelpers(source: DataSourceV2) {
+def asReadSupport: ReadSupport = {
+  source match {
+case support: ReadSupport =>
+  support
+case _: ReadSupportWithSchema =>
+  // this method is only called if there is no user-supplied 
schema. if there is no
+  // user-supplied schema and ReadSupport was not implemented, 
throw a helpful exception.
+  throw new AnalysisException(s"Data source requires a 
user-supplied schema: $name")
+case _ =>
+  throw new AnalysisException(s"Data source is not readable: 
$name")
+  }
+}
+
+def asReadSupportWithSchema: ReadSupportWithSchema = {
+  source match {
+case support: ReadSupportWithSchema =>
+  support
+case _: ReadSupport =>
--- End diff --

This is different from before: see 
https://github.com/apache/spark/pull/20387/files#diff-f70bda59304588cc3abfa3a9840653f4L214

Even if `userSpecifiedSchema` is not none, it's still allowed to have 
`ReadSupport`, as long as its reader's schema is same as the user specified 
schema.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167385722
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,80 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Seq[AttributeReference],
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
--- End diff --

nit: may be more clear to call it `userSpecifiedSchema`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167374952
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
--- End diff --

I mentioned this below, but I should point it out on this thread, too: it 
is not correct to pass output to this relation. There's no guarantee that 
output will match the requested projection exactly, so in addition to the 
problem of leaking v2 details in the planner, this would make it easy to build 
a relation that doesn't correctly report its output.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167374648
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
--- End diff --

Removing it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167374579
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
--- End diff --

I don't think it is correct to make `output` a constructor parameter here. 
The v2 read API allows implementations to return rows with a different schema 
than the one requested, so you don't know whether the projection and the output 
will actually match until you push the projection and look at the schema the 
reader returns.

If `output` were a constructor parameter, then it would be easy 
accidentally create instances where the `output` doesn't match the actual rows 
returned by the source. That's why the current code uses `projection` to pass 
the requested columns, and always sets `output` correctly.

To make the guarantee that the column ids don't change, we don't strictly 
need `output` to be a constructor param. In fact, right now the only time this 
matters is when the projection isn't set. Otherwise, the ids are taken from the 
projection. I've considered a couple of options, like caching the conversion 
from schema to attributes, but I think the easiest option is to make sure that 
`projection` is always set.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167369570
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+new DataSourceOptions(options.asJava)
+  }
+
+  private val sourceName: String = {
+source match {
+  case registered: DataSourceRegister =>
+registered.shortName()
+  case _ =>
+source.getClass.getSimpleName
+}
+  }
+
+  lazy val (
+  reader: DataSourceReader,
+  unsupportedFilters: Seq[Expression],
+  pushedFilters: Seq[Expression]) = {
+val newReader = userSchema match {
+  case Some(s) =>
+asReadSupportWithSchema.createReader(s, v2Options)
+  case _ =>
+asReadSupport.createReader(v2Options)
+}
+
+projection.foreach { attrs =>
+  DataSourceV2Relation.pushRequiredColumns(newReader, 
attrs.toStructType)
+}
+
+val (remainingFilters, pushedFilters) = filters match {
+  case Some(filterSeq) =>
+DataSourceV2Relation.pushFilters(newReader, filterSeq)
+  case _ =>
+(Nil, Nil)
+}
+
+(newReader, remainingFilters, pushedFilters)
+  }
+
+  private lazy val asReadSupport: ReadSupport = {
--- End diff --

I think the implementation is pretty clear:

```java
if (ds.isInstanceOf[ReadSupport] || ds.isInstanceOf[ReadSupportWithSchema]) 
{
  ...
} else {
  loadV1Source(paths: _*)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167347513
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+new DataSourceOptions(options.asJava)
+  }
+
+  private val sourceName: String = {
+source match {
+  case registered: DataSourceRegister =>
+registered.shortName()
+  case _ =>
+source.getClass.getSimpleName
+}
+  }
+
+  lazy val (
+  reader: DataSourceReader,
+  unsupportedFilters: Seq[Expression],
+  pushedFilters: Seq[Expression]) = {
+val newReader = userSchema match {
+  case Some(s) =>
+asReadSupportWithSchema.createReader(s, v2Options)
+  case _ =>
+asReadSupport.createReader(v2Options)
+}
+
+projection.foreach { attrs =>
+  DataSourceV2Relation.pushRequiredColumns(newReader, 
attrs.toStructType)
+}
+
+val (remainingFilters, pushedFilters) = filters match {
+  case Some(filterSeq) =>
+DataSourceV2Relation.pushFilters(newReader, filterSeq)
+  case _ =>
+(Nil, Nil)
+}
+
+(newReader, remainingFilters, pushedFilters)
+  }
+
+  private lazy val asReadSupport: ReadSupport = {
--- End diff --

e.g. I can't find where is the old fallback logic 
(https://github.com/apache/spark/pull/20387/files#diff-f70bda59304588cc3abfa3a9840653f4L198)
 in the new code. It takes more time for reviewers to make sure the new code is 
corrected.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167346995
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
@@ -17,130 +17,55 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeMap, AttributeSet, Expression, NamedExpression, PredicateHelper}
-import org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources
-import org.apache.spark.sql.sources.v2.reader._
 
-/**
- * Pushes down various operators to the underlying data source for better 
performance. Operators are
- * being pushed down with a specific order. As an example, given a LIMIT 
has a FILTER child, you
- * can't push down LIMIT if FILTER is not completely pushed down. When 
both are pushed down, the
- * data source should execute FILTER before LIMIT. And required columns 
are calculated at the end,
- * because when more operators are pushed down, we may need less columns 
at Spark side.
- */
-object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with 
PredicateHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-// Note that, we need to collect the target operator along with 
PROJECT node, as PROJECT may
-// appear in many places for column pruning.
-// TODO: Ideally column pruning should be implemented via a plan 
property that is propagated
-// top-down, then we can simplify the logic here and only collect 
target operators.
-val filterPushed = plan transformUp {
-  case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, 
reader)) =>
-val (candidates, nonDeterministic) =
-  splitConjunctivePredicates(condition).partition(_.deterministic)
-
-val stayUpFilters: Seq[Expression] = reader match {
-  case r: SupportsPushDownCatalystFilters =>
-r.pushCatalystFilters(candidates.toArray)
-
-  case r: SupportsPushDownFilters =>
-// A map from original Catalyst expressions to corresponding 
translated data source
-// filters. If a predicate is not in this map, it means it 
cannot be pushed down.
-val translatedMap: Map[Expression, sources.Filter] = 
candidates.flatMap { p =>
-  DataSourceStrategy.translateFilter(p).map(f => p -> f)
-}.toMap
-
-// Catalyst predicate expressions that cannot be converted to 
data source filters.
-val nonConvertiblePredicates = 
candidates.filterNot(translatedMap.contains)
-
-// Data source filters that cannot be pushed down. An 
unhandled filter means
-// the data source cannot guarantee the rows returned can pass 
the filter.
-// As a result we must return it so Spark can plan an extra 
filter operator.
-val unhandledFilters = 
r.pushFilters(translatedMap.values.toArray).toSet
-val unhandledPredicates = translatedMap.filter { case (_, f) =>
-  unhandledFilters.contains(f)
-}.keys
-
-nonConvertiblePredicates ++ unhandledPredicates
-
-  case _ => candidates
-}
-
-val filterCondition = (stayUpFilters ++ 
nonDeterministic).reduceLeftOption(And)
-val withFilter = filterCondition.map(Filter(_, r)).getOrElse(r)
-if (withFilter.output == fields) {
-  withFilter
-} else {
-  Project(fields, withFilter)
-}
-}
-
-// TODO: add more push down rules.
-
-val columnPruned = pushDownRequiredColumns(filterPushed, 
filterPushed.outputSet)
-// After column pruning, we may have redundant PROJECT nodes in the 
query plan, remove them.
-RemoveRedundantProject(columnPruned)
-  }
-
-  // TODO: nested fields pruning
-  private def pushDownRequiredColumns(
-  plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = {
-plan match {
-  case p @ Project(projectList, child) =>
-val required = projectList.flatMap(_.references)
-p.copy(child = pushDownRequiredColumns(child, 
AttributeSet(required)))
-
-  case f @ Filter(condition, child) =>
-val required = requiredByPar

[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167346297
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+new DataSourceOptions(options.asJava)
+  }
+
+  private val sourceName: String = {
+source match {
+  case registered: DataSourceRegister =>
+registered.shortName()
+  case _ =>
+source.getClass.getSimpleName
+}
+  }
+
+  lazy val (
+  reader: DataSourceReader,
+  unsupportedFilters: Seq[Expression],
+  pushedFilters: Seq[Expression]) = {
+val newReader = userSchema match {
+  case Some(s) =>
+asReadSupportWithSchema.createReader(s, v2Options)
+  case _ =>
+asReadSupport.createReader(v2Options)
+}
+
+projection.foreach { attrs =>
+  DataSourceV2Relation.pushRequiredColumns(newReader, 
attrs.toStructType)
+}
+
+val (remainingFilters, pushedFilters) = filters match {
+  case Some(filterSeq) =>
+DataSourceV2Relation.pushFilters(newReader, filterSeq)
+  case _ =>
+(Nil, Nil)
+}
+
+(newReader, remainingFilters, pushedFilters)
+  }
+
+  private lazy val asReadSupport: ReadSupport = {
--- End diff --

Is there a problem? I think this is a clear way to to handle the cases 
without mixing concerns.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167346043
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
@@ -17,130 +17,55 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeMap, AttributeSet, Expression, NamedExpression, PredicateHelper}
-import org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources
-import org.apache.spark.sql.sources.v2.reader._
 
-/**
- * Pushes down various operators to the underlying data source for better 
performance. Operators are
- * being pushed down with a specific order. As an example, given a LIMIT 
has a FILTER child, you
- * can't push down LIMIT if FILTER is not completely pushed down. When 
both are pushed down, the
- * data source should execute FILTER before LIMIT. And required columns 
are calculated at the end,
- * because when more operators are pushed down, we may need less columns 
at Spark side.
- */
-object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with 
PredicateHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-// Note that, we need to collect the target operator along with 
PROJECT node, as PROJECT may
-// appear in many places for column pruning.
-// TODO: Ideally column pruning should be implemented via a plan 
property that is propagated
-// top-down, then we can simplify the logic here and only collect 
target operators.
-val filterPushed = plan transformUp {
-  case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, 
reader)) =>
-val (candidates, nonDeterministic) =
-  splitConjunctivePredicates(condition).partition(_.deterministic)
-
-val stayUpFilters: Seq[Expression] = reader match {
-  case r: SupportsPushDownCatalystFilters =>
-r.pushCatalystFilters(candidates.toArray)
-
-  case r: SupportsPushDownFilters =>
-// A map from original Catalyst expressions to corresponding 
translated data source
-// filters. If a predicate is not in this map, it means it 
cannot be pushed down.
-val translatedMap: Map[Expression, sources.Filter] = 
candidates.flatMap { p =>
-  DataSourceStrategy.translateFilter(p).map(f => p -> f)
-}.toMap
-
-// Catalyst predicate expressions that cannot be converted to 
data source filters.
-val nonConvertiblePredicates = 
candidates.filterNot(translatedMap.contains)
-
-// Data source filters that cannot be pushed down. An 
unhandled filter means
-// the data source cannot guarantee the rows returned can pass 
the filter.
-// As a result we must return it so Spark can plan an extra 
filter operator.
-val unhandledFilters = 
r.pushFilters(translatedMap.values.toArray).toSet
-val unhandledPredicates = translatedMap.filter { case (_, f) =>
-  unhandledFilters.contains(f)
-}.keys
-
-nonConvertiblePredicates ++ unhandledPredicates
-
-  case _ => candidates
-}
-
-val filterCondition = (stayUpFilters ++ 
nonDeterministic).reduceLeftOption(And)
-val withFilter = filterCondition.map(Filter(_, r)).getOrElse(r)
-if (withFilter.output == fields) {
-  withFilter
-} else {
-  Project(fields, withFilter)
-}
-}
-
-// TODO: add more push down rules.
-
-val columnPruned = pushDownRequiredColumns(filterPushed, 
filterPushed.outputSet)
-// After column pruning, we may have redundant PROJECT nodes in the 
query plan, remove them.
-RemoveRedundantProject(columnPruned)
-  }
-
-  // TODO: nested fields pruning
-  private def pushDownRequiredColumns(
-  plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = {
-plan match {
-  case p @ Project(projectList, child) =>
-val required = projectList.flatMap(_.references)
-p.copy(child = pushDownRequiredColumns(child, 
AttributeSet(required)))
-
-  case f @ Filter(condition, child) =>
-val required = requiredByParent

[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167345858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
--- End diff --

This is currently handled by setting the projection to always get the same 
ids, but it may not cover all copy cases. I'll look into how we can reliably 
move output to the constructor params, but this needs to avoid forcing callers 
to configure a reader.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167344956
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
@@ -17,130 +17,55 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeMap, AttributeSet, Expression, NamedExpression, PredicateHelper}
-import org.apache.spark.sql.catalyst.optimizer.RemoveRedundantProject
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources
-import org.apache.spark.sql.sources.v2.reader._
 
-/**
- * Pushes down various operators to the underlying data source for better 
performance. Operators are
- * being pushed down with a specific order. As an example, given a LIMIT 
has a FILTER child, you
- * can't push down LIMIT if FILTER is not completely pushed down. When 
both are pushed down, the
- * data source should execute FILTER before LIMIT. And required columns 
are calculated at the end,
- * because when more operators are pushed down, we may need less columns 
at Spark side.
- */
-object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with 
PredicateHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-// Note that, we need to collect the target operator along with 
PROJECT node, as PROJECT may
-// appear in many places for column pruning.
-// TODO: Ideally column pruning should be implemented via a plan 
property that is propagated
-// top-down, then we can simplify the logic here and only collect 
target operators.
-val filterPushed = plan transformUp {
-  case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, 
reader)) =>
-val (candidates, nonDeterministic) =
-  splitConjunctivePredicates(condition).partition(_.deterministic)
-
-val stayUpFilters: Seq[Expression] = reader match {
-  case r: SupportsPushDownCatalystFilters =>
-r.pushCatalystFilters(candidates.toArray)
-
-  case r: SupportsPushDownFilters =>
-// A map from original Catalyst expressions to corresponding 
translated data source
-// filters. If a predicate is not in this map, it means it 
cannot be pushed down.
-val translatedMap: Map[Expression, sources.Filter] = 
candidates.flatMap { p =>
-  DataSourceStrategy.translateFilter(p).map(f => p -> f)
-}.toMap
-
-// Catalyst predicate expressions that cannot be converted to 
data source filters.
-val nonConvertiblePredicates = 
candidates.filterNot(translatedMap.contains)
-
-// Data source filters that cannot be pushed down. An 
unhandled filter means
-// the data source cannot guarantee the rows returned can pass 
the filter.
-// As a result we must return it so Spark can plan an extra 
filter operator.
-val unhandledFilters = 
r.pushFilters(translatedMap.values.toArray).toSet
-val unhandledPredicates = translatedMap.filter { case (_, f) =>
-  unhandledFilters.contains(f)
-}.keys
-
-nonConvertiblePredicates ++ unhandledPredicates
-
-  case _ => candidates
-}
-
-val filterCondition = (stayUpFilters ++ 
nonDeterministic).reduceLeftOption(And)
-val withFilter = filterCondition.map(Filter(_, r)).getOrElse(r)
-if (withFilter.output == fields) {
-  withFilter
-} else {
-  Project(fields, withFilter)
-}
-}
-
-// TODO: add more push down rules.
-
-val columnPruned = pushDownRequiredColumns(filterPushed, 
filterPushed.outputSet)
-// After column pruning, we may have redundant PROJECT nodes in the 
query plan, remove them.
-RemoveRedundantProject(columnPruned)
-  }
-
-  // TODO: nested fields pruning
-  private def pushDownRequiredColumns(
-  plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = {
-plan match {
-  case p @ Project(projectList, child) =>
-val required = projectList.flatMap(_.references)
-p.copy(child = pushDownRequiredColumns(child, 
AttributeSet(required)))
-
-  case f @ Filter(condition, child) =>
-val required = requiredByPar

[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167344060
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
--- End diff --

Sorry I just recalled, there is a rule(undocumented) that leaf node should 
make `output` as a constructor parameter. The reason is that, if the plan gets 
transformed and copied, the `output` can not change, otherwise parent node's 
expressions may refer to non-exist attributes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167343401
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+new DataSourceOptions(options.asJava)
+  }
+
+  private val sourceName: String = {
+source match {
+  case registered: DataSourceRegister =>
+registered.shortName()
+  case _ =>
+source.getClass.getSimpleName
+}
+  }
+
+  lazy val (
+  reader: DataSourceReader,
+  unsupportedFilters: Seq[Expression],
+  pushedFilters: Seq[Expression]) = {
+val newReader = userSchema match {
+  case Some(s) =>
+asReadSupportWithSchema.createReader(s, v2Options)
+  case _ =>
+asReadSupport.createReader(v2Options)
+}
+
+projection.foreach { attrs =>
+  DataSourceV2Relation.pushRequiredColumns(newReader, 
attrs.toStructType)
+}
+
+val (remainingFilters, pushedFilters) = filters match {
+  case Some(filterSeq) =>
+DataSourceV2Relation.pushFilters(newReader, filterSeq)
+  case _ =>
+(Nil, Nil)
+}
+
+(newReader, remainingFilters, pushedFilters)
+  }
+
+  private lazy val asReadSupport: ReadSupport = {
--- End diff --

It's a little hard to map the new logic to what we have in 
https://github.com/apache/spark/pull/20387/files#diff-f70bda59304588cc3abfa3a9840653f4L201
 .

Can we keep the previous code style? i.e. `val reader = (ds, 
userSpecifiedSchema) match ...`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167343364
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
--- End diff --

This is a bad idea because it means that any place that creates this 
relation needs to know to create a reader and get its schema to pass in. That's 
confusing because some code will pass a user schema and some code will pass a 
projection.

Keeping this logic inside of the relation ensures that it is done correctly 
and without leaking details from the v2 API into the rest of the planner.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167342730
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
--- End diff --

the comment is out-dated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167342554
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
--- End diff --

Since we need to keep the "pushed columns" anyway, can we just use the 
`output` field? The only difference between `output` and `projection` is the 
default value, this can be solved by creating a temp reader and get its schema 
in `DataFrameReader`. We already did this in `DataStreamReader.load`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167320953
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,130 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
--- End diff --

I agree that it's a bad idea to run push-down here. I fixed this by 
implementing `doCanonicalize` and returning a node that overrides the `output` 
val. I think that is cleaner than pulling logic outside of the relation. 
There's no need for every place that creates a relation to need to get the 
output of a reader.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167295506
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,130 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+new DataSourceOptions(options.asJava)
--- End diff --

I think you're missing that it's a bad idea to pass `DataSourceOptions` to 
the relation in the first place. As I've said, this class is specific to v2 
readers and writers, and doesn't need to be leaked into logical plans.

To make this a bit more clear: If we pass the the options to the relation, 
then every time a new code path instantiates the relation, someone has to know 
where that central place to create options is. You're trading the problem for 
one only slightly better.

What if every relation required its own specific classes to pass the same 
information? That would be a mess. Rather than know what custom object to 
instantiate and where the helper for it is, it makes more sense to pass data to 
the relation. Then we can also use the info in match expressions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167288341
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,130 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+new DataSourceOptions(options.asJava)
+  }
+
+  private val sourceName: String = {
+source match {
+  case registered: DataSourceRegister =>
+registered.shortName()
+  case _ =>
+source.getClass.getSimpleName
+}
+  }
+
+  lazy val (
+  reader: DataSourceReader,
+  unsupportedFilters: Seq[Expression],
+  pushedFilters: Seq[Expression]) = {
+val newReader = userSchema match {
+  case Some(s) =>
+asReadSupportWithSchema.createReader(s, v2Options)
+  case _ =>
+asReadSupport.createReader(v2Options)
+}
+
+projection.foreach { attrs =>
+  DataSourceV2Relation.pushRequiredColumns(newReader, 
attrs.toStructType)
+}
+
+val (remainingFilters, pushedFilters) = filters match {
+  case Some(filterSeq) =>
+DataSourceV2Relation.pushFilters(newReader, filterSeq)
+  case _ =>
+(Nil, Nil)
+}
+
+(newReader, remainingFilters, pushedFilters)
+  }
 
-  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2Relation]
+  def writer(dfSchema: StructType, mode: SaveMode): 
Option[DataSourceWriter] = {
--- End diff --

There's a key difference here that you're missing. The streaming API was 
rushed and committed without adequate review input. The comment indicated that 
the commit was probably unfinished and, if so, the remaining changes could be 
put into a completely different commit.

In contrast, this contained a few additional things that make 
`DataSourceV2Relation` complete and more useful for the next PRs. Thinking 
about how this relat

[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167147910
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,130 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
--- End diff --

I pulled your code and played with it. So your PR does fix the bug, but in 
a hacky way. Let's me explain what happened.
1. `QueryPlan.canonicalized` is called, every expression in 
`DataSourceV2Relation` is canonicalized, including 
`DataSourceV2Relation.projection`. This means, the attributes in `projection` 
are all renamed to "none".
2. `DataSourceV2Relation.output` is called, which triggers the creation of 
the reader, and applies filter push down and column pruning. Note that because 
all attributes are renamed to "none", we are actually pushing invalid filters 
and columns to data sources.
3. line up `reader.schema` and `projection`, to get the actual output. 
Because all names are "none", it works.

However step 2 is pretty dangerous, Spark doesn't define the behavior of 
pushing invalid filters and columns, especially what `reader.schema` should 
return after invalid columns are pushed down.

I prefer my original fix, which put `output` in `DataSourceV2Relation`'s 
constructor parameters, and update it when doing column pruning in 
`PushDownOperatorsToDataSource`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167142433
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,130 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+new DataSourceOptions(options.asJava)
--- End diff --

We all agree that duplicating the logic of creating `DataSourceOptions` in 
many places is a bad idea. Currently there are 2 proposals:

1. Have a central place to take care the data source v2 resolution logic, 
including option creating. This is the approach of data source v1, i.e. the 
class `DataSource`.
2. Similar to proposal 1, but make `DataSourceV2Relation` the central place.

For now we don't know which one is better, it depends on how data source v2 
evolves in the future. At this point of time, I think we should pick the 
simplest approach, which is passing the `DataSourceOptions` to 
`DataSourceV2Relation`. Then we just need a one-line change in 
`DataFrameReader`, and don't need to add `v2Options` here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167141001
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,130 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+new DataSourceOptions(options.asJava)
+  }
+
+  private val sourceName: String = {
+source match {
+  case registered: DataSourceRegister =>
+registered.shortName()
+  case _ =>
+source.getClass.getSimpleName
+}
+  }
+
+  lazy val (
+  reader: DataSourceReader,
+  unsupportedFilters: Seq[Expression],
+  pushedFilters: Seq[Expression]) = {
+val newReader = userSchema match {
+  case Some(s) =>
+asReadSupportWithSchema.createReader(s, v2Options)
+  case _ =>
+asReadSupport.createReader(v2Options)
+}
+
+projection.foreach { attrs =>
+  DataSourceV2Relation.pushRequiredColumns(newReader, 
attrs.toStructType)
+}
+
+val (remainingFilters, pushedFilters) = filters match {
+  case Some(filterSeq) =>
+DataSourceV2Relation.pushFilters(newReader, filterSeq)
+  case _ =>
+(Nil, Nil)
+}
+
+(newReader, remainingFilters, pushedFilters)
+  }
 
-  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2Relation]
+  def writer(dfSchema: StructType, mode: SaveMode): 
Option[DataSourceWriter] = {
--- End diff --

I think we should avoid adding unused code that is needed in the future. 
The streaming data source v2 was a bad example and you already pointed it out. 
Hope we don't make the same mistake in the future.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167005107
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,130 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+new DataSourceOptions(options.asJava)
+  }
+
+  private val sourceName: String = {
+source match {
+  case registered: DataSourceRegister =>
+registered.shortName()
+  case _ =>
+source.getClass.getSimpleName
+}
+  }
+
+  lazy val (
+  reader: DataSourceReader,
+  unsupportedFilters: Seq[Expression],
+  pushedFilters: Seq[Expression]) = {
+val newReader = userSchema match {
+  case Some(s) =>
+asReadSupportWithSchema.createReader(s, v2Options)
+  case _ =>
+asReadSupport.createReader(v2Options)
+}
+
+projection.foreach { attrs =>
+  DataSourceV2Relation.pushRequiredColumns(newReader, 
attrs.toStructType)
+}
+
+val (remainingFilters, pushedFilters) = filters match {
+  case Some(filterSeq) =>
+DataSourceV2Relation.pushFilters(newReader, filterSeq)
+  case _ =>
+(Nil, Nil)
+}
+
+(newReader, remainingFilters, pushedFilters)
+  }
 
-  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2Relation]
+  def writer(dfSchema: StructType, mode: SaveMode): 
Option[DataSourceWriter] = {
--- End diff --

If you want this removed to get this commit in, please say so.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167004846
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,130 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
--- End diff --

That commit was not reverted when I rebased. The test is still present and 
passing: 
https://github.com/apache/spark/blob/181946d1f1c5889661544830a77bd23c4b4f685a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala#L320-L336


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167001183
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,130 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+new DataSourceOptions(options.asJava)
--- End diff --

Also, keep in mind that this is a lazy val. It is only referenced when 
creating a reader or writer


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166998163
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,130 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
--- End diff --

I'll have a look. I didn't realize you'd committed that one already.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166997697
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,130 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+new DataSourceOptions(options.asJava)
+  }
+
+  private val sourceName: String = {
+source match {
+  case registered: DataSourceRegister =>
+registered.shortName()
+  case _ =>
+source.getClass.getSimpleName
+}
+  }
+
+  lazy val (
+  reader: DataSourceReader,
+  unsupportedFilters: Seq[Expression],
+  pushedFilters: Seq[Expression]) = {
+val newReader = userSchema match {
+  case Some(s) =>
+asReadSupportWithSchema.createReader(s, v2Options)
+  case _ =>
+asReadSupport.createReader(v2Options)
+}
+
+projection.foreach { attrs =>
+  DataSourceV2Relation.pushRequiredColumns(newReader, 
attrs.toStructType)
+}
+
+val (remainingFilters, pushedFilters) = filters match {
+  case Some(filterSeq) =>
+DataSourceV2Relation.pushFilters(newReader, filterSeq)
+  case _ =>
+(Nil, Nil)
+}
+
+(newReader, remainingFilters, pushedFilters)
+  }
 
-  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2Relation]
+  def writer(dfSchema: StructType, mode: SaveMode): 
Option[DataSourceWriter] = {
--- End diff --

No, it isn't. But a relation should be able to return a writer. This is 
going to be needed as we improve the logical plans used by v2.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166997241
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,130 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+new DataSourceOptions(options.asJava)
--- End diff --

As we've already discussed at length, I think it is a bad idea to create 
`DataSourceOptions` and pass it to the relation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166827975
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,130 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
--- End diff --

This reverts https://github.com/apache/spark/pull/20485 , can we still pass 
the test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166827720
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,130 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+new DataSourceOptions(options.asJava)
--- End diff --

This makes us create a new `DataSourceOptions` everytime 
`DataSourceV2Relation` gets transformed and copied. Can we just create 
`DataSourceOptions` at `DataFrameReader` and pass it to `DataSourceV2Relation`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166827284
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,130 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+new DataSourceOptions(options.asJava)
+  }
+
+  private val sourceName: String = {
+source match {
+  case registered: DataSourceRegister =>
+registered.shortName()
+  case _ =>
+source.getClass.getSimpleName
+}
+  }
+
+  lazy val (
+  reader: DataSourceReader,
+  unsupportedFilters: Seq[Expression],
+  pushedFilters: Seq[Expression]) = {
+val newReader = userSchema match {
+  case Some(s) =>
+asReadSupportWithSchema.createReader(s, v2Options)
+  case _ =>
+asReadSupport.createReader(v2Options)
+}
+
+projection.foreach { attrs =>
+  DataSourceV2Relation.pushRequiredColumns(newReader, 
attrs.toStructType)
+}
+
+val (remainingFilters, pushedFilters) = filters match {
+  case Some(filterSeq) =>
+DataSourceV2Relation.pushFilters(newReader, filterSeq)
+  case _ =>
+(Nil, Nil)
+}
+
+(newReader, remainingFilters, pushedFilters)
+  }
 
-  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2Relation]
+  def writer(dfSchema: StructType, mode: SaveMode): 
Option[DataSourceWriter] = {
--- End diff --

This method is not used anywhere.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166691646
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,139 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
--- End diff --

You've missed the idea that I was trying to make with that point.

The idea isn't to make "surgical" commits that hardly make any changes and 
must exactly match the JIRA description. If we did that, we would end up making 
only tiny commits, spending far too much time filling out JIRA issues, and 
buried under an unmanageable review backlog.

The point is to think about whether the content of a commit would confuse 
someone managing a branch or cause avoidable conflicts when reverting or 
picking commits.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166526321
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,139 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
--- End diff --

Sorry I might not make it clear. What I proposed is to only include changes 
that are necessary for this PR. To make the plan immutable, what we need is:
1. keep required columns and pushed filters in the plan. So here we add 2 
parameters: `projection` and `filters`.
2. keep things that are needed to create a `DataSourceReader`, i.e. 
`DataSourceV2`, `DataSourceOption` and `userSpecifiedSchema`.

For 1 we are good. For 2, I think the most intuitive and surgical way is to 
create the `DataSourceOption` in `DataFrameReader`(the current behavior) and 
keep the `DataSourceOption` in `DataSourceV2Relation`.

There may be an argument about putting these common information in the 
`simpleString`, I'm +1 on doing this, but this should be done after 
https://issues.apache.org/jira/browse/SPARK-23341 when the community have an 
agreement on an initial list of common information(may be more than just table 
and path).

I really like your proposal about "don’t mix partial, unrelated changes 
into a commit", and I hope all the open source community can follow it and make 
high quality PRs. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166394747
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
--- End diff --

I'm not saying that `DataSourceOptions` have to be handled in the relation. 
Just that the relation should use the same classes to pass data, like 
`TableIdentifier`, that are used by the rest of the planner. I agree with those 
benefits of doing this.

Is there anything that needs to change in this PR? We can move where the 
options are created in a follow-up, but let me know if you think this would 
prevent this from getting merged.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166392277
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
--- End diff --

I agree that `DataFrameReader` should not be the only place that creates 
`DataSourceV2Relation`, so handling `DataSourceOptions` in `DataFrameReader` is 
a bad idea.

But handling `DataSourceOptions` in `DataSourceV2Relation` is not the only 
option. Like data source v1(see `DataSource`), we can have a central place to 
deal with the data source stuff, which can also minimize the number of places 
that need to handle `DataSourceOptions`.

The actual benefits I see are:
1. It's easier to write catalyst rules to match a `DataSourceV2Relation` 
with a specific table identifier.
2. It's easier to reuse `DataSourceV2Relation` if we have data source v3 
later, because we don't have any v2 specific stuff in the constructor.

For 1, I think we can solve it by defining a `DataSourceV2Relation.unapply` 
to match the table identifier. For 2, I think it may not worth to consider it 
at this point.

There might be more benefits I was missing, please correct me if I was 
wrong, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166386661
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
--- End diff --

I think that `TableIdentifier` and a string-to-string `Map` should be 
passed to `DataSourceV2Relation` and that either the relation or 
`DataSourceOptions` should be responsible for creating `DataSourceOptions` with 
well-defined properties to to pass the table information to implementations.

This minimizes the number of places that need to handle `DataSourceOptions` 
(which is specific to v2) and uses `TableIdentifier` and `Map` to match the 
rest of the planner nodes. For example, other read paths that can create 
`DataSourceV2Relation`, like resolution rules, use `TableIdentifier`.

I'm not currently advocating a position on how to configure 
`DataFrameReader` or `DataFrameWriter` or how to handle schemas.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166168357
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
--- End diff --

Let me confirm your expectation for the data source v2 framework. First we 
define some standard information, like shema, path, table. We provide dedicated 
methods in `DataFrameReader` and `DataFrameWriter` for end users to specify 
them, e.g. `DataFrameReader.schame/path/table`. Then these information are 
passed to the read/write logical plan directly, i.e. `DataSourceV2Relation` and 
`WriteToDataSourceV2`. At the end the read/write logical plan construct the 
`DataSourceOption`, put these standard information to the option with standard 
option keys. Do I understand it correctly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166165255
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+"DataSourceV2Relation(" +
+  s"source=$sourceName${path.orElse(table).map(loc => 
s"($loc)").getOrElse("")}, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}] options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+path match {
+  case Some(p) =>
+updatedOptions.put("path", p)
+  case None =>
+updatedOptions.remove("path")
+}
+
+table.map { ident =>
+  updatedOptions.put("table", ident.table)
--- End diff --

Opened [SPARK-23341](https://issues.apache.org/jira/browse/SPARK-23341) for 
this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166165005
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+"DataSourceV2Relation(" +
+  s"source=$sourceName${path.orElse(table).map(loc => 
s"($loc)").getOrElse("")}, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}] options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+path match {
+  case Some(p) =>
+updatedOptions.put("path", p)
+  case None =>
+updatedOptions.remove("path")
+}
+
+table.map { ident =>
+  updatedOptions.put("table", ident.table)
--- End diff --

I think we agree here. I want to avoid doing this outside of either 
`DataSourceOptions` or `DataSourceV2Relation`. If we can create 
`DataSourceOptions` from a `Option[TableIdentifier]` and add the `getTable` 
accessor, then that works for me. My main motivation is to avoid having this 
piece of code copied throughout the SQL planner.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166163929
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+"DataSourceV2Relation(" +
+  s"source=$sourceName${path.orElse(table).map(loc => 
s"($loc)").getOrElse("")}, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}] options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+path match {
+  case Some(p) =>
+updatedOptions.put("path", p)
+  case None =>
+updatedOptions.remove("path")
+}
+
+table.map { ident =>
+  updatedOptions.put("table", ident.table)
--- End diff --

this is something I wanna avoid. At the end we still need to define some 
standard option keys, like `path` and `table`. It would be good if we can 
provide some methods for end users and data source developers to easily 
write/read these standard options. I agree this can be done in a different PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166081367
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
--- End diff --

I should clarify this as well: I like the idea of standardizing how 
implementations access the table name from `DataSourceOptions`. However, I 
don't think that is sufficient to remove the table and path here in the 
definition of `DataSourceV2Relation` for the reasons above.

I think we *should* follow this commit with a plan for how implementations 
access these options. For now, it is good to put the creation of those options 
in a single place.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r166030958
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
--- End diff --

@cloud-fan, sorry if it was not clear: Yes, I have considered it and I 
think it is a bad idea. I sent a note to the dev list about this issue, as well 
if you want more context. There are two main reasons:

1. Your proposal creates more places that are responsible for creating a 
`DataSourceOptions` with the right property names. All of the places where we 
have a `TableIdentifier` and want to convert to a `DataSourceV2Relation` need 
to copy the same logic and worry about using the same properties.
What you propose is hard to maintain and error prone: what happens if 
we decide not to pass the database if it is `None` in a `TableIdentifier`? We 
would have to validate every place that creates a v2 relation. On the other 
hand, if we pass `TableIdentifier` here, we have one code path that converts. 
It is also easier for us to pass `TableIdentifier` to the data sources if we 
choose to update the API.
2. There is no reason to use `DataSourceOptions` outside of v2 at this 
point. This PR doesn't expose the v2-specific options class to other places in 
the codebase. Instead, it uses a map for generic options and classes that can 
be used in pattern matching where possible. And again, this has fewer places 
that create v2 internal classes, which is easier for maintenance.

If you want to add those methods to the options class so that 
implementations can easily access path and table name, then we can do that in a 
follow-up PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r165981421
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,151 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
--- End diff --

have you considered about 
https://github.com/apache/spark/pull/20387#issuecomment-362148217 ?

I feel it's better to define these common options in `DataSourceOptions`, 
so that data source implementations can also get these common options easily.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-02 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r165751660
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,15 +17,149 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
 import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-fullOutput: Seq[AttributeReference],
-reader: DataSourceV2Reader) extends LeafNode with 
DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+path: Option[String] = None,
+table: Option[TableIdentifier] = None,
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
--- End diff --

I like this pattern.  I think it is important that the arguments to a query 
plan node are comprehensive so that it is easy to understand what is going on 
in the output of `explain()`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org