[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...

2018-04-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21029#discussion_r181735802
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java
 ---
@@ -18,18 +18,26 @@
 package org.apache.spark.sql.sources.v2.reader;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import 
org.apache.spark.sql.sources.v2.reader.streaming.ContinuousDataReader;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 /**
  * A mix-in interface for {@link DataReaderFactory}. Continuous data 
reader factories can
  * implement this interface to provide creating {@link DataReader} with 
particular offset.
  */
 @InterfaceStability.Evolving
-public interface ContinuousDataReaderFactory extends 
DataReaderFactory {
-  /**
-   * Create a DataReader with particular offset as its startOffset.
-   *
-   * @param offset offset want to set as the DataReader's startOffset.
-   */
-  DataReader createDataReaderWithOffset(PartitionOffset offset);
--- End diff --

cc @jose-torres , seems this method is never used.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...

2018-04-11 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21029#discussion_r180796522
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -95,21 +77,29 @@ case class DataSourceV2ScanExec(
   
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
   sparkContext.env)
 .askSync[Unit](SetReaderPartitions(readerFactories.size))
-  new ContinuousDataSourceRDD(sparkContext, sqlContext, 
readerFactories)
-.asInstanceOf[RDD[InternalRow]]
-
-case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
-  new DataSourceRDD(sparkContext, 
batchReaderFactories).asInstanceOf[RDD[InternalRow]]
-
+  if (readerFactories.exists(_.dataFormat() == 
DataFormat.COLUMNAR_BATCH)) {
+throw new IllegalArgumentException(
+  "continuous stream reader does not support columnar read yet.")
--- End diff --

Then the missing piece is codegen. This is difficult because the continuous 
stream reader does a lot of auxiliary work, so I don't know if it will happen 
in the near future.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...

2018-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21029#discussion_r180705366
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -143,7 +144,7 @@ case class MemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
   logDebug(generateDebugString(newBlocks.flatten, startOrdinal, 
endOrdinal))
 
   newBlocks.map { block =>
-new 
MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory[UnsafeRow]]
+new 
MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory]
--- End diff --

Array is a java-friendly type.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...

2018-04-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21029#discussion_r180705219
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -95,21 +77,29 @@ case class DataSourceV2ScanExec(
   
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
   sparkContext.env)
 .askSync[Unit](SetReaderPartitions(readerFactories.size))
-  new ContinuousDataSourceRDD(sparkContext, sqlContext, 
readerFactories)
-.asInstanceOf[RDD[InternalRow]]
-
-case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
-  new DataSourceRDD(sparkContext, 
batchReaderFactories).asInstanceOf[RDD[InternalRow]]
-
+  if (readerFactories.exists(_.dataFormat() == 
DataFormat.COLUMNAR_BATCH)) {
+throw new IllegalArgumentException(
+  "continuous stream reader does not support columnar read yet.")
--- End diff --

We use a type erase hack, and lie to the Scala compiler that we are 
outputting InternalRow. At runtime, we cast the data to ColumnarBatch in 
codegen.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...

2018-04-10 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21029#discussion_r180489842
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -143,7 +144,7 @@ case class MemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
   logDebug(generateDebugString(newBlocks.flatten, startOrdinal, 
endOrdinal))
 
   newBlocks.map { block =>
-new 
MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory[UnsafeRow]]
+new 
MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory]
--- End diff --

I'd like that, but I don't know if that would make things harder for data 
source implementers working in Java.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...

2018-04-10 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21029#discussion_r180489396
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -95,21 +77,29 @@ case class DataSourceV2ScanExec(
   
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
   sparkContext.env)
 .askSync[Unit](SetReaderPartitions(readerFactories.size))
-  new ContinuousDataSourceRDD(sparkContext, sqlContext, 
readerFactories)
-.asInstanceOf[RDD[InternalRow]]
-
-case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
-  new DataSourceRDD(sparkContext, 
batchReaderFactories).asInstanceOf[RDD[InternalRow]]
-
+  if (readerFactories.exists(_.dataFormat() == 
DataFormat.COLUMNAR_BATCH)) {
+throw new IllegalArgumentException(
+  "continuous stream reader does not support columnar read yet.")
--- End diff --

I don't, because I'm not really sure how it works in the batch case. How 
does it work to do

new DataSourceRDD(sparkContext, 
batchReaderFactories).asInstanceOf[RDD[InternalRow]]

when the type parameter of batchReaderFactories doesn't match InternalRow?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...

2018-04-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21029#discussion_r180468202
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -143,7 +144,7 @@ case class MemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
   logDebug(generateDebugString(newBlocks.flatten, startOrdinal, 
endOrdinal))
 
   newBlocks.map { block =>
-new 
MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory[UnsafeRow]]
+new 
MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory]
--- End diff --

cc @rdblue @jose-torres 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...

2018-04-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21029#discussion_r180468097
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -143,7 +144,7 @@ case class MemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
   logDebug(generateDebugString(newBlocks.flatten, startOrdinal, 
endOrdinal))
 
   newBlocks.map { block =>
-new 
MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory[UnsafeRow]]
+new 
MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory]
--- End diff --

I have seen this pattern many time, the java `List` is a little trouble 
because it's invariance. Shall we change the interface to use array?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21029: [WIP][SPARK-23952] remove type parameter in DataR...

2018-04-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21029#discussion_r180467422
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -95,21 +77,29 @@ case class DataSourceV2ScanExec(
   
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
   sparkContext.env)
 .askSync[Unit](SetReaderPartitions(readerFactories.size))
-  new ContinuousDataSourceRDD(sparkContext, sqlContext, 
readerFactories)
-.asInstanceOf[RDD[InternalRow]]
-
-case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
-  new DataSourceRDD(sparkContext, 
batchReaderFactories).asInstanceOf[RDD[InternalRow]]
-
+  if (readerFactories.exists(_.dataFormat() == 
DataFormat.COLUMNAR_BATCH)) {
+throw new IllegalArgumentException(
+  "continuous stream reader does not support columnar read yet.")
--- End diff --

cc @jose-torres do you know what's missing for this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org