[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20153


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r161362565
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.*;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+
+public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
+
+  class Reader implements DataSourceV2Reader, SupportsScanColumnarBatch {
--- End diff --

This is the convention. If we implement many mix-in interfaces, it's better 
to write
```
MyReader extends DataSourceV2Reader, XXX, YYY, ZZZ ...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r161361844
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.List;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers 
can implement this
+ * interface to output {@link ColumnarBatch} and make the scan faster.
+ */
+@InterfaceStability.Evolving
+public interface SupportsScanColumnarBatch extends DataSourceV2Reader {
+  @Override
+  default List createReadTasks() {
+throw new IllegalStateException(
+  "createReadTasks not supported by default within 
SupportsScanColumnarBatch.");
+  }
+
+  /**
+   * Similar to {@link DataSourceV2Reader#createReadTasks()}, but returns 
columnar data in batches.
+   */
+  List createBatchReadTasks();
+
+  /**
+   * A safety door for columnar batch reader. It's possible that the 
implementation can only support
+   * some certain columns with certain types. Users can overwrite this 
method and
+   * {@link #createReadTasks()} to fallback to normal read path under some 
conditions.
+   */
+  default boolean enableBatchRead() {
--- End diff --

I feel it is hard to tell from the document that if this method is used to 
enable batch reading or to know if this reader support batch reading.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r161362201
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.*;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+
+public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
+
+  class Reader implements DataSourceV2Reader, SupportsScanColumnarBatch {
--- End diff --

Doesn't `SupportsScanColumnarBatch` already extend `DataSourceV2Reader`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-12 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r161362308
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.List;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers 
can implement this
+ * interface to output {@link ColumnarBatch} and make the scan faster.
+ */
+@InterfaceStability.Evolving
+public interface SupportsScanColumnarBatch extends DataSourceV2Reader {
+  @Override
+  default List createReadTasks() {
+throw new IllegalStateException(
+  "createReadTasks should not be called with 
SupportsScanColumnarBatch.");
+  }
+
+  /**
+   * Similar to {@link DataSourceV2Reader#createReadTasks()}, but returns 
columnar data in batches.
+   */
+  List createBatchReadTasks();
+
+  /**
+   * A safety door for columnar batch reader. It's possible that the 
implementation can only support
+   * some certain columns with certain types. Users can overwrite this 
method and
+   * {@link #createReadTasks()} to fallback to normal read path under some 
conditions.
+   */
+  default boolean enableBatchRead() {
--- End diff --

I see. It would be good to clarify it in the comment.
For example, Is this true? `A safety door for [[ColumnarBatch]] reader.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-12 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r161362205
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -90,14 +92,56 @@ case class InMemoryTableScanExec(
 columnarBatch
   }
 
-  override def inputRDDs(): Seq[RDD[InternalRow]] = {
-assert(supportCodegen)
+  private lazy val inputRDD: RDD[InternalRow] = {
 val buffers = filteredCachedBatches()
-// HACK ALERT: This is actually an RDD[ColumnarBatch].
-// We're taking advantage of Scala's type erasure here to pass these 
batches along.
-
Seq(buffers.map(createAndDecompressColumn(_)).asInstanceOf[RDD[InternalRow]])
+if (supportsBatch) {
+  // HACK ALERT: This is actually an RDD[ColumnarBatch].
+  // We're taking advantage of Scala's type erasure here to pass these 
batches along.
+  buffers.map(createAndDecompressColumn).asInstanceOf[RDD[InternalRow]]
+} else {
+  val numOutputRows = longMetric("numOutputRows")
+
+  if (enableAccumulators) {
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-12 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r161265463
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala 
---
@@ -17,21 +17,24 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, 
UnsafeRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 
 
 /**
- * Helper trait for abstracting scan functionality using
- * [[ColumnarBatch]]es.
+ * Helper trait for abstracting scan functionality using 
[[ColumnarBatch]]es.
  */
 private[sql] trait ColumnarBatchScan extends CodegenSupport {
 
   def vectorTypes: Option[Seq[String]] = None
 
+  protected def supportsBatch: Boolean = true
--- End diff --

Add a comment to explain `supportsBatch `?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-12 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r161257419
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -37,40 +35,58 @@ import org.apache.spark.sql.types.StructType
  */
 case class DataSourceV2ScanExec(
 fullOutput: Seq[AttributeReference],
-@transient reader: DataSourceV2Reader) extends LeafExecNode with 
DataSourceReaderHolder {
+@transient reader: DataSourceV2Reader)
+  extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
 
   override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2ScanExec]
 
-  override def references: AttributeSet = AttributeSet.empty
+  override def producedAttributes: AttributeSet = AttributeSet(fullOutput)
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-12 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r161252949
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -90,14 +92,56 @@ case class InMemoryTableScanExec(
 columnarBatch
   }
 
-  override def inputRDDs(): Seq[RDD[InternalRow]] = {
-assert(supportCodegen)
+  private lazy val inputRDD: RDD[InternalRow] = {
 val buffers = filteredCachedBatches()
-// HACK ALERT: This is actually an RDD[ColumnarBatch].
-// We're taking advantage of Scala's type erasure here to pass these 
batches along.
-
Seq(buffers.map(createAndDecompressColumn(_)).asInstanceOf[RDD[InternalRow]])
+if (supportsBatch) {
+  // HACK ALERT: This is actually an RDD[ColumnarBatch].
+  // We're taking advantage of Scala's type erasure here to pass these 
batches along.
+  buffers.map(createAndDecompressColumn).asInstanceOf[RDD[InternalRow]]
+} else {
+  val numOutputRows = longMetric("numOutputRows")
+
+  if (enableAccumulators) {
--- End diff --

This conf is really confusing... Maybe renaming it to 
`enableAccumulatorsForTestingOnly`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-11 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r161034623
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -37,40 +35,58 @@ import org.apache.spark.sql.types.StructType
  */
 case class DataSourceV2ScanExec(
 fullOutput: Seq[AttributeReference],
-@transient reader: DataSourceV2Reader) extends LeafExecNode with 
DataSourceReaderHolder {
+@transient reader: DataSourceV2Reader)
+  extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
 
   override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2ScanExec]
 
-  override def references: AttributeSet = AttributeSet.empty
+  override def producedAttributes: AttributeSet = AttributeSet(fullOutput)
 
-  override lazy val metrics = Map(
-"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
+  private lazy val readTasks: java.util.List[ReadTask[UnsafeRow]] = reader 
match {
+case r: SupportsScanUnsafeRow => r.createUnsafeRowReadTasks()
+case _ =>
+  reader.createReadTasks().asScala.map {
+new RowToUnsafeRowReadTask(_, reader.readSchema()): 
ReadTask[UnsafeRow]
+  }.asJava
+  }
 
-  override protected def doExecute(): RDD[InternalRow] = {
-val readTasks: java.util.List[ReadTask[UnsafeRow]] = reader match {
-  case r: SupportsScanUnsafeRow => r.createUnsafeRowReadTasks()
-  case _ =>
-reader.createReadTasks().asScala.map {
-  new RowToUnsafeRowReadTask(_, reader.readSchema()): 
ReadTask[UnsafeRow]
-}.asJava
-}
+  private lazy val inputRDD: RDD[InternalRow] = reader match {
+case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
+  assert(!reader.isInstanceOf[ContinuousReader],
+"continuous stream reader does not support columnar read yet.")
+  new DataSourceRDD(sparkContext, 
r.createBatchReadTasks()).asInstanceOf[RDD[InternalRow]]
+
+case _ =>
--- End diff --

we can combine the child case clause with the outer one, like:
```
reader match {
case r: SupportsScanColumnarBatch if r.enableBatchRead() => ..
case _: ContinuousReader => ..
case _ => ..
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r160958116
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.List;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers 
can implement this
+ * interface to output {@link ColumnarBatch} and make the scan faster.
+ */
+@InterfaceStability.Evolving
+public interface SupportsScanColumnarBatch extends DataSourceV2Reader {
+  @Override
+  default List createReadTasks() {
+throw new IllegalStateException(
+  "createReadTasks should not be called with 
SupportsScanColumnarBatch.");
+  }
+
+  /**
+   * Similar to {@link DataSourceV2Reader#createReadTasks()}, but returns 
columnar data in batches.
+   */
+  List createBatchReadTasks();
+
+  /**
+   * A safety door for columnar batch reader. It's possible that the 
implementation can only support
+   * some certain columns with certain types. Users can overwrite this 
method and
+   * {@link #createReadTasks()} to fallback to normal read path under some 
conditions.
+   */
+  default boolean enableBatchRead() {
--- End diff --

Yea you can interpret it in this way (read data from columnar storage or 
row storage), but we can also interpret it as reading a batch of records at a 
time or one record at a time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-10 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r160880016
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -37,40 +35,58 @@ import org.apache.spark.sql.types.StructType
  */
 case class DataSourceV2ScanExec(
 fullOutput: Seq[AttributeReference],
-@transient reader: DataSourceV2Reader) extends LeafExecNode with 
DataSourceReaderHolder {
+@transient reader: DataSourceV2Reader)
+  extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
 
   override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2ScanExec]
 
-  override def references: AttributeSet = AttributeSet.empty
-
-  override lazy val metrics = Map(
-"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
-
-  override protected def doExecute(): RDD[InternalRow] = {
-val readTasks: java.util.List[ReadTask[UnsafeRow]] = reader match {
-  case r: SupportsScanUnsafeRow => r.createUnsafeRowReadTasks()
-  case _ =>
-reader.createReadTasks().asScala.map {
-  new RowToUnsafeRowReadTask(_, reader.readSchema()): 
ReadTask[UnsafeRow]
-}.asJava
-}
+  override def producedAttributes: AttributeSet = AttributeSet(fullOutput)
+
+  private lazy val inputRDD: RDD[InternalRow] = reader match {
+case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
+  assert(!reader.isInstanceOf[ContinuousReader],
+"continuous stream reader does not support columnar read yet.")
+  new DataSourceRDD(sparkContext, 
r.createBatchReadTasks()).asInstanceOf[RDD[InternalRow]]
+
+case _ =>
+  val readTasks: java.util.List[ReadTask[UnsafeRow]] = reader match {
+case r: SupportsScanUnsafeRow => r.createUnsafeRowReadTasks()
+case _ =>
+  reader.createReadTasks().asScala.map {
+new RowToUnsafeRowReadTask(_, reader.readSchema()): 
ReadTask[UnsafeRow]
+  }.asJava
+  }
+
+  reader match {
--- End diff --

This looks a bit messy, can we move `readTasks` out as a lazy val then we 
may have:
```
private lazy val readTasks = ..
private lazy val inputRDD: RDD[InternalRow] = reader match {
case r: SupportsScanColumnarBatch if r.enableBatchRead() => ..
case _: ContinuousReader => ..
case _ => ..
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-10 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r160877601
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.List;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers 
can implement this
+ * interface to output {@link ColumnarBatch} and make the scan faster.
+ */
+@InterfaceStability.Evolving
+public interface SupportsScanColumnarBatch extends DataSourceV2Reader {
+  @Override
+  default List createReadTasks() {
+throw new IllegalStateException(
+  "createReadTasks should not be called with 
SupportsScanColumnarBatch.");
--- End diff --

`createReadTasks not supported by default within 
SupportsScanColumnarBatch.`, since we allow users to fallback to normal read 
path.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-10 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r160764573
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.List;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers 
can implement this
+ * interface to output {@link ColumnarBatch} and make the scan faster.
+ */
+@InterfaceStability.Evolving
+public interface SupportsScanColumnarBatch extends DataSourceV2Reader {
+  @Override
+  default List createReadTasks() {
+throw new IllegalStateException(
+  "createReadTasks should not be called with 
SupportsScanColumnarBatch.");
+  }
+
+  /**
+   * Similar to {@link DataSourceV2Reader#createReadTasks()}, but returns 
columnar data in batches.
+   */
+  List createBatchReadTasks();
+
+  /**
+   * A safety door for columnar batch reader. It's possible that the 
implementation can only support
+   * some certain columns with certain types. Users can overwrite this 
method and
+   * {@link #createReadTasks()} to fallback to normal read path under some 
conditions.
+   */
+  default boolean enableBatchRead() {
--- End diff --

If it controls batch mode or non-batch mode, I agree.  

IIUC, this value is used to show whether we enable to read data from 
column-oriented storage (e.g. `ColumnarVector`) or  row-oriented storage (e.g. 
`UnsafeRow`). I feel that it is not a batch mode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-10 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r160747322
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.List;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers 
can implement this
+ * interface to output {@link ColumnarBatch} and make the scan faster.
+ */
+@InterfaceStability.Evolving
+public interface SupportsScanColumnarBatch extends DataSourceV2Reader {
+  @Override
+  default List createReadTasks() {
+throw new IllegalStateException(
+  "createReadTasks should not be called with 
SupportsScanColumnarBatch.");
+  }
+
+  /**
+   * Similar to {@link DataSourceV2Reader#createReadTasks()}, but returns 
columnar data in batches.
+   */
+  List createBatchReadTasks();
+
+  /**
+   * A safety door for columnar batch reader. It's possible that the 
implementation can only support
+   * some certain columns with certain types. Users can overwrite this 
method and
+   * {@link #createReadTasks()} to fallback to normal read path under some 
conditions.
+   */
+  default boolean enableBatchRead() {
--- End diff --

This name is more general. It looks fine to me. In the future, if we 
support another batch read mode, we can add the extra function to further 
identify the batch mode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-10 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r160746791
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.List;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers 
can implement this
+ * interface to output {@link ColumnarBatch} and make the scan faster.
--- End diff --

We need to explain the precedence of `SupportsScanColumnarBatch ` and 
`SupportsScanUnsafeRow`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-09 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r160477594
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala 
---
@@ -17,21 +17,24 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, 
UnsafeRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 
 
 /**
- * Helper trait for abstracting scan functionality using
- * [[ColumnarBatch]]es.
+ * Helper trait for abstracting scan functionality using 
[[ColumnarBatch]]es.
  */
 private[sql] trait ColumnarBatchScan extends CodegenSupport {
 
   def vectorTypes: Option[Seq[String]] = None
 
+  protected def supportsBatch: Boolean = true
--- End diff --

`supportColumnar()` or `supportColumnarBatch()`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-09 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r160477447
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.List;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers 
can implement this
+ * interface to output {@link ColumnarBatch} and make the scan faster.
+ */
+@InterfaceStability.Evolving
+public interface SupportsScanColumnarBatch extends DataSourceV2Reader {
+  @Override
+  default List createReadTasks() {
+throw new IllegalStateException(
+  "createReadTasks should not be called with 
SupportsScanColumnarBatch.");
+  }
+
+  /**
+   * Similar to {@link DataSourceV2Reader#createReadTasks()}, but returns 
columnar data in batches.
+   */
+  List createBatchReadTasks();
+
+  /**
+   * A safety door for columnar batch reader. It's possible that the 
implementation can only support
+   * some certain columns with certain types. Users can overwrite this 
method and
+   * {@link #createReadTasks()} to fallback to normal read path under some 
conditions.
+   */
+  default boolean enableBatchRead() {
--- End diff --

`enableColumnarRead()` or `enableColumnarBatchRead()`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-09 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r160472490
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala 
---
@@ -137,4 +147,25 @@ private[sql] trait ColumnarBatchScan extends 
CodegenSupport {
  """.stripMargin
   }
 
+  private def produceRows(ctx: CodegenContext, input: String): String = {
+val numOutputRows = metricTerm(ctx, "numOutputRows")
+val row = ctx.freshName("row")
+
+ctx.INPUT_ROW = row
+ctx.currentVars = null
+// Always provide `outputVars`, so that the framework can help us 
build unsafe row if the input
+// row is not unsafe row, i.e. `needsUnsafeRowConversion` is true.
+val outputVars = output.zipWithIndex.map{ case (a, i) =>
--- End diff --

nit: `map {`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r159678434
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -37,40 +35,58 @@ import org.apache.spark.sql.types.StructType
  */
 case class DataSourceV2ScanExec(
 fullOutput: Seq[AttributeReference],
-@transient reader: DataSourceV2Reader) extends LeafExecNode with 
DataSourceReaderHolder {
+@transient reader: DataSourceV2Reader)
+  extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
 
   override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2ScanExec]
 
-  override def references: AttributeSet = AttributeSet.empty
-
-  override lazy val metrics = Map(
-"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
-
-  override protected def doExecute(): RDD[InternalRow] = {
-val readTasks: java.util.List[ReadTask[UnsafeRow]] = reader match {
-  case r: SupportsScanUnsafeRow => r.createUnsafeRowReadTasks()
-  case _ =>
-reader.createReadTasks().asScala.map {
-  new RowToUnsafeRowReadTask(_, reader.readSchema()): 
ReadTask[UnsafeRow]
-}.asJava
-}
+  override def producedAttributes: AttributeSet = AttributeSet(fullOutput)
+
+  private lazy val inputRDD: RDD[InternalRow] = reader match {
+case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
+  assert(!reader.isInstanceOf[ContinuousReader],
+"continuous stream reader does not support columnar read yet.")
+  new DataSourceRDD(sparkContext, 
r.createBatchReadTasks()).asInstanceOf[RDD[InternalRow]]
--- End diff --

cc @zsxwing can streaming support columnar batch reader technically?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r159678205
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -230,43 +274,10 @@ case class InMemoryTableScanExec(
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-val numOutputRows = longMetric("numOutputRows")
--- End diff --

This is moved to `inputRDD`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20153#discussion_r159678110
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -346,33 +348,6 @@ case class FileSourceScanExec(
 
   override val nodeNamePrefix: String = "File"
 
-  override protected def doProduce(ctx: CodegenContext): String = {
--- End diff --

This is moved to `ColumnarBatchScan.produceRows`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20153: [SPARK-22392][SQL] data source v2 columnar batch ...

2018-01-04 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/20153

[SPARK-22392][SQL] data source v2 columnar batch reader

## What changes were proposed in this pull request?

a new Data Source V2 interface to allow the data source to return 
`ColumnarBatch` during the scan.

## How was this patch tested?

new tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark columnar-reader

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20153.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20153






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org