[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r138123207
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.Filter
+import 
org.apache.spark.sql.sources.v2.reader.downward.{CatalystFilterPushDownSupport, 
ColumnPruningSupport, FilterPushDownSupport}
+
+object DataSourceV2Strategy extends Strategy {
+  // TODO: write path
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+case PhysicalOperation(projects, filters, DataSourceV2Relation(output, 
reader)) =>
+  val attrMap = AttributeMap(output.zip(output))
+
+  val projectSet = AttributeSet(projects.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  // Match original case of attributes.
+  // TODO: nested fields pruning
+  val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap)
--- End diff --

It seems reasonable to only request the ones that will be used, or that 
have residuals after pushing filters.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137951790
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.Filter
+import 
org.apache.spark.sql.sources.v2.reader.downward.{CatalystFilterPushDownSupport, 
ColumnPruningSupport, FilterPushDownSupport}
+
+object DataSourceV2Strategy extends Strategy {
+  // TODO: write path
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+case PhysicalOperation(projects, filters, DataSourceV2Relation(output, 
reader)) =>
+  val attrMap = AttributeMap(output.zip(output))
+
+  val projectSet = AttributeSet(projects.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  // Match original case of attributes.
+  // TODO: nested fields pruning
+  val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap)
--- End diff --

Do we need to request columns that are only referenced by pushed filters? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137951690
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.Filter
+import 
org.apache.spark.sql.sources.v2.reader.downward.{CatalystFilterPushDownSupport, 
ColumnPruningSupport, FilterPushDownSupport}
+
+object DataSourceV2Strategy extends Strategy {
+  // TODO: write path
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+case PhysicalOperation(projects, filters, DataSourceV2Relation(output, 
reader)) =>
+  val attrMap = AttributeMap(output.zip(output))
+
+  val projectSet = AttributeSet(projects.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  // Match original case of attributes.
+  // TODO: nested fields pruning
+  val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap)
+  reader match {
+case r: ColumnPruningSupport =>
+  r.pruneColumns(requiredColumns.toStructType)
+case _ =>
+  }
+
+  val stayUpFilters: Seq[Expression] = reader match {
+case r: CatalystFilterPushDownSupport =>
+  r.pushCatalystFilters(filters.toArray)
+
+case r: FilterPushDownSupport =>
--- End diff --

Looks like `CatalystFilterPushDownSupport` and `FilterPushDownSupport` are 
exclusive?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137829674
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader
+import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport
+
+case class DataSourceV2Relation(
+output: Seq[AttributeReference],
+reader: DataSourceV2Reader) extends LeafNode {
+
+  override def computeStats(): Statistics = reader match {
+case r: StatisticsSupport => Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes())
+case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes)
+  }
+}
+
+object DataSourceV2Relation {
+  def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
+new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
--- End diff --

I think we should add a way to provide partition values outside of the 
columnar reader. It wouldn't be too difficult to add a method on `ReadTask` 
that returns them, then create a joined row in the scan exec. Otherwise, this 
requires a lot of wasted memory for a scan.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137699153
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/StatisticsSupport.java
 ---
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.upward;
+
+/**
+ * A mix in interface for `DataSourceV2Reader`. Users can implement this 
interface to report
+ * statistics to Spark.
+ */
+public interface StatisticsSupport {
--- End diff --

I'd like to put column stats in a separated interface, because we already 
separate basic stats and column stats in `ANALYZE TABLE`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137698996
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader
+import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport
+
+case class DataSourceV2Relation(
+output: Seq[AttributeReference],
+reader: DataSourceV2Reader) extends LeafNode {
+
+  override def computeStats(): Statistics = reader match {
+case r: StatisticsSupport => Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes())
+case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes)
+  }
+}
+
+object DataSourceV2Relation {
+  def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
+new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
--- End diff --

In data source V2, we will delegate partition pruning to the data source, 
although we need to do some refactoring to make it happen.

> I was just looking into how the data source should provide partition 
data, or at least fields that are the same for all rows in a `ReadTask`. It 
would be nice to have a way to pass those up instead of materializing them in 
each `UnsafeRow`.

This can be achieved by the columnar reader. Think about a data source 
having a data column `i` and a partition column `j`, the returned columnar 
batch has 2 column vectors for `i` and `j`. Column vector `i` is a normal one 
that contains all the values of column `i` within this batch, column vector `j` 
is a constant vector that only contains a single value.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137597910
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader
+import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport
+
+case class DataSourceV2Relation(
+output: Seq[AttributeReference],
+reader: DataSourceV2Reader) extends LeafNode {
+
+  override def computeStats(): Statistics = reader match {
+case r: StatisticsSupport => Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes())
+case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes)
+  }
+}
+
+object DataSourceV2Relation {
+  def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
+new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
--- End diff --

Are you saying that partition pruning isn't delegated to the data source in 
this interface?

I was just looking into how the data source should provide partition data, 
or at least fields that are the same for all rows in a `ReadTask`. It would be 
nice to have a way to pass those up instead of materializing them in each 
`UnsafeRow`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137595695
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader
+import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport
+
+case class DataSourceV2Relation(
+output: Seq[AttributeReference],
+reader: DataSourceV2Reader) extends LeafNode {
+
+  override def computeStats(): Statistics = reader match {
+case r: StatisticsSupport => Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes())
+case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes)
+  }
+}
+
+object DataSourceV2Relation {
+  def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
+new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
--- End diff --

I leave it as a 
[TODO](https://github.com/apache/spark/pull/19136#discussion_r137023744) as it 
needs some refactoring on the optimizer. For now `DataSourceV2Relation` 
represents a data source without any optimization: we do these optimizations 
during planning. This is also a problem for data source v1, and that's why we 
implement partition pruning as an optimizer rule instead of data source 
internal, because we need to update the stats.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137593873
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.Serializable;
+
+/**
+ * A read task returned by a data source reader and is responsible to 
create the data reader.
+ * The relationship between `ReadTask` and `DataReader` is similar to 
`Iterable` and `Iterator`.
+ *
+ * Note that, the read task will be serialized and sent to executors, then 
the data reader will be
+ * created on executors and do the actual reading.
+ */
+public interface ReadTask extends Serializable {
+  /**
+   * The preferred locations for this read task to run faster, but Spark 
can't guarantee that this
+   * task will always run on these locations. Implementations should make 
sure that it can
+   * be run on any location.
+   */
+  default String[] preferredLocations() {
--- End diff --

This API matches the `RDD.preferredLocations` directly, I'll add more 
documents here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137591425
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader
+import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport
+
+case class DataSourceV2Relation(
+output: Seq[AttributeReference],
+reader: DataSourceV2Reader) extends LeafNode {
+
+  override def computeStats(): Statistics = reader match {
+case r: StatisticsSupport => Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes())
+case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes)
+  }
+}
+
+object DataSourceV2Relation {
+  def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
+new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
--- End diff --

Is this the right schema? The docs for `readSchema` say it is the result of 
pushdown and projection, which doesn't seem appropriate for a `Relation`. Does 
relation represent a table that can be filtered and projected, or does it 
represent a single read? At least in the Hive read path, it's a table.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137587367
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SchemaRequiredDataSourceV2.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A variant of `DataSourceV2` which requires users to provide a schema 
when reading data. A data
+ * source can inherit both `DataSourceV2` and `SchemaRequiredDataSourceV2` 
if it supports both schema
+ * inference and user-specified schemas.
+ */
+public interface SchemaRequiredDataSourceV2 {
+
+  /**
+   * Create a `DataSourceV2Reader` to scan the data for this data source.
+   *
+   * @param schema the full schema of this data source reader. Full schema 
usually maps to the
+   *   physical schema of the underlying storage of this data 
source reader, e.g.
+   *   parquet files, JDBC tables, etc, while this reader may 
not read data with full
--- End diff --

Maybe update the doc here, since JDBC sources and Parquet files probably 
shouldn't implement this. CSV and JSON are the examples that come to mind for 
sources that require a schema.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread RussellSpitzer
Github user RussellSpitzer commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137531270
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.Serializable;
+
+/**
+ * A read task returned by a data source reader and is responsible to 
create the data reader.
+ * The relationship between `ReadTask` and `DataReader` is similar to 
`Iterable` and `Iterator`.
+ *
+ * Note that, the read task will be serialized and sent to executors, then 
the data reader will be
+ * created on executors and do the actual reading.
+ */
+public interface ReadTask extends Serializable {
+  /**
+   * The preferred locations for this read task to run faster, but Spark 
can't guarantee that this
+   * task will always run on these locations. Implementations should make 
sure that it can
+   * be run on any location.
+   */
+  default String[] preferredLocations() {
--- End diff --

These have previously only been ip/hostnames. To match the RDD definition I 
think we would have to continue with that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread ash211
Github user ash211 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137469790
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.Serializable;
+
+/**
+ * A read task returned by a data source reader and is responsible to 
create the data reader.
+ * The relationship between `ReadTask` and `DataReader` is similar to 
`Iterable` and `Iterator`.
+ *
+ * Note that, the read task will be serialized and sent to executors, then 
the data reader will be
+ * created on executors and do the actual reading.
+ */
+public interface ReadTask extends Serializable {
+  /**
+   * The preferred locations for this read task to run faster, but Spark 
can't guarantee that this
+   * task will always run on these locations. Implementations should make 
sure that it can
+   * be run on any location.
+   */
+  default String[] preferredLocations() {
--- End diff --

what format are these strings expected to be in?  If Spark will be placing 
this ReadTask onto an executor that is a preferred location, the format will 
need to be a documented part of the API

are there levels of preference, or only the binary?  I'm thinking node vs 
rack vs datacenter for on-prem clusters, or instance vs AZ vs region for cloud 
clusers


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread ash211
Github user ash211 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137471056
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/upward/StatisticsSupport.java
 ---
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.upward;
+
+/**
+ * A mix in interface for `DataSourceV2Reader`. Users can implement this 
interface to report
+ * statistics to Spark.
+ */
+public interface StatisticsSupport {
--- End diff --

some datasources have per-column statistics, like how many bytes a column 
has or its min/max (e.g. things required for CBO).

should that be a separate interface from this one?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137469668
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/scan/UnsafeRowScan.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.scan;
+
+import java.util.List;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.ReadTask;
+
+/**
+ * A mix-in interface for `DataSourceV2Reader`. Users can implement this 
interface to output
+ * unsafe rows directly and avoid the row copy at Spark side.
+ *
+ * Note that, this is an experimental and unstable interface, as 
`UnsafeRow` is not public and
+ * may get changed in future Spark versions.
+ */
+@Experimental
+@InterfaceStability.Unstable
+public interface UnsafeRowScan extends DataSourceV2Reader {
--- End diff --

cc @j-baker for the new unsafe row scan API. Programmatically unsafe row 
scan should be in the base class, and normal row scan should be in the child 
class. However, conceptually for a developer, normal row scan is a basic 
interface and should be in the base class. Unsafe row scan is kind of an add-on 
and should be in the child class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137455199
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java 
---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * A case-insensitive string-to-string map, which is used to represent 
data source options.
+ */
+public class DataSourceV2Options {
+  private Map keyLowerCasedMap = new HashMap<>();
+
+  /**
+   * Adds one more entry to the options.
+   * This should only be called by Spark, not data source implementations.
+   */
+  public void addOption(String key, String value) {
--- End diff --

good point, I'll make it immutable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread sureshthalamati
Github user sureshthalamati commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137454044
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java 
---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * A case-insensitive string-to-string map, which is used to represent 
data source options.
+ */
+public class DataSourceV2Options {
+  private Map keyLowerCasedMap = new HashMap<>();
+
+  /**
+   * Adds one more entry to the options.
+   * This should only be called by Spark, not data source implementations.
+   */
+  public void addOption(String key, String value) {
--- End diff --

The check added for addOption protects modifying the options passed to the 
datasource, but data source can still add new options by accident. I think it 
might be safer to pass DataSourceV2Options that are Unmodifiable by the data 
source.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread sureshthalamati
Github user sureshthalamati commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137453919
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The main interface and minimal requirement for a data source reader. 
The implementations should
+ * at least implement the full scan logic, users can mix in more 
interfaces to implement scan
+ * optimizations like column pruning, filter push down, etc.
+ *
+ * There are mainly 2 kinds of scan optimizations:
+ *   1. push operators downward to the data source, e.g., column pruning, 
filter push down, etc.
+ *   2. propagate information upward to Spark, e.g., report statistics, 
report ordering, etc.
+ * Spark first applies all operator push down optimizations which this 
data source supports. Then
+ * Spark collects information this data source provides for further 
optimizations. Finally Spark
+ * issues the scan request and does the actual data reading.
+ */
+public abstract class DataSourceV2Reader {
+
+  /**
+   * Returns the actual schema of this data source reader, which may be 
different from the physical
+   * schema of the underlying storage, as column pruning or other 
optimizations may happen.
+   */
+  public abstract StructType readSchema();
+
+  /**
+   * Returns a list of read tasks, each task is responsible for outputting 
data for one RDD
+   * partition, which means the number of tasks returned here is same as 
the number of RDD
+   * partitions this scan outputs.
+   *
+   * Note that, this may not be a full scan if the data source reader 
mixes in other optimization
+   * interfaces like column pruning, filter push down, etc. These 
optimizations are applied before
+   * Spark issues the scan request.
+   */
+  protected abstract List createReadTasks();
+
+  /**
+   * Inside Spark, the input rows will be converted to `UnsafeRow`s before 
processing. To avoid
+   * this conversion, implementations can overwrite this method and output 
`UnsafeRow`s directly.
+   * Note that, this is an experimental and unstable interface, as 
`UnsafeRow` is not public and
+   * may get changed in future Spark versions.
+   *
+   * If implementations overwrite this method, `createReadTasks` will 
never be called and they can
+   * just throw an exception in `createReadTasks`.
+   */
+  @Experimental
+  @InterfaceStability.Unstable
+  public List createUnsafeRowReadTasks() {
--- End diff --

I really like the new API's flexibility to implement the different types of 
support.  Considering UnsafeRow is unstable , Would it be possible to move  
createUnsafeRowReadTasks  to  a different interface ?  That might make data 
source implement two types of data sources  one with Row , and another one with 
UnsafeRow and make it easily configurable based on the spark version. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread sureshthalamati
Github user sureshthalamati commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137452697
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SchemaRequiredDataSourceV2.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A variant of `DataSourceV2` which requires users to provide a schema 
when reading data. A data
+ * source can inherit both `DataSourceV2` and `SchemaRequiredDataSourceV2` 
if it supports both schema
+ * inference and user-specified schemas.
--- End diff --

The check  added for addOption protects modifying the options passed to the 
datasource, but data source can still add new options by accident.  I think it 
might be safer to pass DataSourceV2Options that are **Unmodifiable** by the 
data source. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137435478
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SchemaRequiredDataSourceV2.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A variant of `DataSourceV2` which requires users to provide a schema 
when reading data. A data
+ * source can inherit both `DataSourceV2` and `SchemaRequiredDataSourceV2` 
if it supports both schema
+ * inference and user-specified schemas.
--- End diff --

cc @rdblue for the new API of schema reference.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-06 Thread RussellSpitzer
Github user RussellSpitzer commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137333974
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The main interface and minimal requirement for a data source reader. 
The implementations should
+ * at least implement the full scan logic, users can mix in more 
interfaces to implement scan
+ * optimizations like column pruning, filter push down, etc.
+ *
+ * There are mainly 2 kinds of scan optimizations:
+ *   1. push operators downward to the data source, e.g., column pruning, 
filter push down, etc.
+ *   2. propagate information upward to Spark, e.g., report statistics, 
report ordering, etc.
+ * Spark first applies all operator push down optimizations which this 
data source supports. Then
+ * Spark collects information this data source provides for further 
optimizations. Finally Spark
+ * issues the scan request and does the actual data reading.
--- End diff --

This would be really nice imho.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137023744
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The main interface and minimal requirement for a data source reader. 
The implementations should
+ * at least implement the full scan logic, users can mix in more 
interfaces to implement scan
+ * optimizations like column pruning, filter push down, etc.
+ *
+ * There are mainly 2 kinds of scan optimizations:
+ *   1. push operators downward to the data source, e.g., column pruning, 
filter push down, etc.
+ *   2. propagate information upward to Spark, e.g., report statistics, 
report ordering, etc.
+ * Spark first applies all operator push down optimizations which this 
data source supports. Then
+ * Spark collects information this data source provides for further 
optimizations. Finally Spark
+ * issues the scan request and does the actual data reading.
--- End diff --

TODO: this is not true now, as we push down operators at the planning 
phase. We need to do some refactor and move it to the optimizing phase.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-05 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/19136

[DO NOT MERGE][SPARK-15689][SQL] data source v2

## What changes were proposed in this pull request?

This PR adds the infrastructure for data source v2, and implement features 
which Spark already have in data source v1, i.e. column pruning, filter push 
down, catalyst expression filter push down, InternalRow scan, schema inference, 
data size report. The write path is excluded to avoid making this PR growing 
too big, and will be added in follow-up PR.

We should not merge this PR until the SPIP vote passes.

## How was this patch tested?

new tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark data-source-v2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19136.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19136


commit 543a40b2c9d4c456f8a726a12f78bd7f7d529b93
Author: Wenchen Fan 
Date:   2017-09-05T11:45:42Z

data source v2




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org