[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-11-19 Thread cloud-fan
Github user cloud-fan closed the pull request at:

https://github.com/apache/spark/pull/22547


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-11-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r230989559
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala
 ---
@@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType
  *   scenarios, where some offsets after the specified 
initial ones can't be
  *   properly read.
  */
-class KafkaContinuousReadSupport(
+class KafkaContinuousInputStream(
--- End diff --

Yea I'll separate this PR into 3 smaller ones, after we have agreed on the 
high-level design at 
https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-11-05 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r230973917
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala
 ---
@@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType
  *   scenarios, where some offsets after the specified 
initial ones can't be
  *   properly read.
  */
-class KafkaContinuousReadSupport(
+class KafkaContinuousInputStream(
--- End diff --

Makes sense. I really consider this to be a blocker on getting this merged 
and approved. It's difficult to have confidence in a review over such a large 
change. Thoughts @cloud-fan @rdblue?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-11-02 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r230528510
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala
 ---
@@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType
  *   scenarios, where some offsets after the specified 
initial ones can't be
  *   properly read.
  */
-class KafkaContinuousReadSupport(
+class KafkaContinuousInputStream(
--- End diff --

I'd prefer that the commits themselves compile, but since this is 
separating the modes I think it could be done incrementally.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-11-02 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r230505785
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala
 ---
@@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType
  *   scenarios, where some offsets after the specified 
initial ones can't be
  *   properly read.
  */
-class KafkaContinuousReadSupport(
+class KafkaContinuousInputStream(
--- End diff --

+1 for this. A lot of the changes right now are for moving around the 
streaming code especially, which makes it harder to isolate just the proposed 
API for review.

An alternative is to make this PR separate commits that, while the commits 
themselves may not compile because of mismatching signatures - but all the 
commits taken together would compile, and each commit can be reviewed 
individually for assessing the API and then the implementation.

For example I'd propose 3 PRs:

* Batch reading, with a commit for the interface changes and a separate 
commit for the implementation changes
* Micro Batch Streaming read, with a commit for the interface changes and a 
separate commit for the implementation changes
* Continuous streaming read, similar to above

Thoughts?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226812577
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/Format.java ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The base interface for data source v2. Implementations must have a 
public, 0-arg constructor.
+ *
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ */
+@InterfaceStability.Evolving
+public interface Format extends DataSourceV2 {
--- End diff --

the write API has not been migrated and still need `DataSourceV2`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226798538
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/Format.java ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The base interface for data source v2. Implementations must have a 
public, 0-arg constructor.
+ *
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ */
+@InterfaceStability.Evolving
+public interface Format extends DataSourceV2 {
+
+  /**
+   * Return a {@link Table} instance to do read/write with user-specified 
options.
+   *
+   * @param options the user-specified options that can identify a table, 
e.g. path, table name,
--- End diff --

Why is it necessary to pass table name and database to Format? Format 
should only be used in 2 places to create tables. First, in the DataFrameReader 
(or writer) API when a format is specified directly instead of a 
catalog/database/table or catalog/path. Second, it would be used in catalogs 
that support pluggable implementations, like the current session catalog, which 
needs to dynamically instantiate implementations based on the table's provider.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226798213
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/Format.java ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The base interface for data source v2. Implementations must have a 
public, 0-arg constructor.
+ *
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ */
+@InterfaceStability.Evolving
+public interface Format extends DataSourceV2 {
--- End diff --

Why is there both Format and DataSourceV2? What does DataSourceV2 do?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226796934
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -173,12 +185,17 @@ object DataSourceV2Relation {
   source: DataSourceV2,
   options: Map[String, String],
   tableIdent: Option[TableIdentifier] = None,
-  userSpecifiedSchema: Option[StructType] = None): 
DataSourceV2Relation = {
-val readSupport = source.createReadSupport(options, 
userSpecifiedSchema)
-val output = readSupport.fullSchema().toAttributes
+  userSpecifiedSchema: Option[StructType] = None): 
Option[DataSourceV2Relation] = {
--- End diff --

This shouldn't return an option. A relation is not a read-side structure, 
it is also used in write-side logical plans as the target of a write. 
Validation rules like PreprocessTableInsertion validate the write dataframe 
against the relation's schema. That's why the relation has a newWriteSupport 
method.

Creating a relation from a Table should always work, even if the table 
isn't readable or isn't writable. Analysis can be done later to validate 
whether the plan that contains a relation can actually use the table.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226790252
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/InputStream.java
 ---
@@ -17,14 +17,18 @@
 
 package org.apache.spark.sql.sources.v2.reader.streaming;
 
-import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
 
 /**
- * A base interface for streaming read support. This is package private 
and is invisible to data
- * sources. Data sources should implement concrete streaming read support 
interfaces:
- * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ * An interface representing a readable data stream in a streaming query. 
It's responsible to manage
+ * the offsets of the streaming source in this streaming query.
+ *
+ * Data sources should implement concrete input stream interfaces: {@link 
MicroBatchInputStream} and
+ * {@link ContinuousInputStream}.
  */
-interface StreamingReadSupport extends ReadSupport {
+@InterfaceStability.Evolving
+public interface InputStream extends BaseStreamingSource {
--- End diff --

`InputStream` conflicts with a well-known JVM class, 
[`java.io.InputStream`](https://docs.oracle.com/javase/9/docs/api/java/io/InputStream.html).
 I think this should be renamed to be more specific to a streaming table scan.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226789748
  
--- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java 
---
@@ -15,37 +15,43 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.sources.v2.reader;
+package org.apache.spark.sql.sources.v2;
 
 import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.datasources.v2.NoopScanConfigBuilder;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+import org.apache.spark.sql.types.StructType;
 
 /**
- * An interface that defines how to load the data from data source for 
batch processing.
+ * An interface representing a logical structured data set of a data 
source. For example, the
+ * implementation can be a directory on the file system, or a table in the 
catalog, etc.
  *
- * The execution engine will get an instance of this interface from a data 
source provider
- * (e.g. {@link org.apache.spark.sql.sources.v2.BatchReadSupportProvider}) 
at the start of a batch
- * query, then call {@link #newScanConfigBuilder()} and create an instance 
of {@link ScanConfig}.
- * The {@link ScanConfigBuilder} can apply operator pushdown and keep the 
pushdown result in
- * {@link ScanConfig}. The {@link ScanConfig} will be used to create input 
partitions and reader
- * factory to scan data from the data source with a Spark job.
+ * This interface can mixin the following interfaces to support different 
operations:
+ * 
+ *   {@link SupportsBatchRead}: this table can be read in batch 
queries.
+ *   {@link SupportsMicroBatchRead}: this table can be read in 
streaming queries with
+ *   micro-batch trigger.
+ *   {@link SupportsContinuousRead}: this table can be read in 
streaming queries with
+ *   continuous trigger.
+ * 
  */
 @InterfaceStability.Evolving
-public interface BatchReadSupport extends ReadSupport {
+public interface Table {
+
+  /**
+   * Returns the schema of this table.
+   */
+  StructType schema();
 
   /**
* Returns a builder of {@link ScanConfig}. Spark will call this method 
and create a
* {@link ScanConfig} for each data scanning job.
*
* The builder can take some query specific information to do operators 
pushdown, and keep these
* information in the created {@link ScanConfig}.
-   *
-   * This is the first step of the data scan. All other methods in {@link 
BatchReadSupport} needs
-   * to take {@link ScanConfig} as an input.
-   */
-  ScanConfigBuilder newScanConfigBuilder();
-
-  /**
-   * Returns a factory, which produces one {@link PartitionReader} for one 
{@link InputPartition}.
*/
-  PartitionReaderFactory createReaderFactory(ScanConfig config);
+  default ScanConfigBuilder newScanConfigBuilder(DataSourceOptions 
options) {
--- End diff --

I think it should be clear that these are scan-specific options. Maybe add 
some documentation with an example of something that would be passed to 
configure a scan, like a target split size for combining.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226789610
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java 
---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.BatchScan;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * A mix-in interface for {@link Table}. Table implementations can mixin 
this interface to
+ * provide data reading ability for batch processing.
+ */
+@InterfaceStability.Evolving
+public interface SupportsBatchRead extends Table {
+
+  /**
+   * Creates a {@link BatchScan} instance with a {@link ScanConfig} and 
user-specified options.
+   *
+   * @param config a {@link ScanConfig} which may contains operator 
pushdown information.
+   * @param options the user-specified options, which is same as the one 
used to create the
+   *{@link ScanConfigBuilder} that built the given {@link 
ScanConfig}.
+   */
+  BatchScan createBatchScan(ScanConfig config, DataSourceOptions options);
--- End diff --

Is there a benefit to having both `ScanConfig` and `BatchScan` objects? Why 
not have `ScanConfigBuilder` return a `BatchScan` directly by calling 
`buildBatch`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226785695
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchScan.java ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.SupportsBatchRead;
+import org.apache.spark.sql.sources.v2.Table;
+
+/**
+ * A {@link Scan} for batch queries.
+ *
+ * The execution engine will get an instance of {@link Table} first, then 
call
+ * {@link Table#newScanConfigBuilder(DataSourceOptions)} and create an 
instance of
+ * {@link ScanConfig}. The {@link ScanConfigBuilder} can apply operator 
pushdown and keep the
+ * pushdown result in {@link ScanConfig}. Then
+ * {@link SupportsBatchRead#createBatchScan(ScanConfig, 
DataSourceOptions)} will be called to create
+ * a {@link BatchScan} instance, which will be used to create input 
partitions and reader factory to
+ * scan data from the data source with a Spark job.
+ */
+@InterfaceStability.Evolving
+public interface BatchScan extends Scan {
+
+  /**
+   * Returns a factory, which produces one {@link PartitionReader} for one 
{@link InputPartition}.
+   */
+  PartitionReaderFactory createReaderFactory();
--- End diff --

Why are `BatchScan` and `PartitionReaderFactory` different interfaces?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226784919
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java 
---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.BatchScan;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * A mix-in interface for {@link Table}. Table implementations can mixin 
this interface to
+ * provide data reading ability for batch processing.
+ */
+@InterfaceStability.Evolving
+public interface SupportsBatchRead extends Table {
+
+  /**
+   * Creates a {@link BatchScan} instance with a {@link ScanConfig} and 
user-specified options.
+   *
+   * @param config a {@link ScanConfig} which may contains operator 
pushdown information.
+   * @param options the user-specified options, which is same as the one 
used to create the
+   *{@link ScanConfigBuilder} that built the given {@link 
ScanConfig}.
--- End diff --

I don't think that options should be passed twice.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226783272
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -106,85 +107,96 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
   failOnDataLoss(caseInsensitiveParams))
   }
 
-  /**
-   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to 
read
-   * batches of Kafka data in a micro-batch streaming query.
-   */
-  override def createMicroBatchReadSupport(
-  metadataPath: String,
-  options: DataSourceOptions): KafkaMicroBatchReadSupport = {
-
-val parameters = options.asMap().asScala.toMap
-validateStreamOptions(parameters)
-// Each running query should use its own group id. Otherwise, the 
query may be only assigned
-// partial data since Kafka will assign partitions to multiple 
consumers having the same group
-// id. Hence, we should generate a unique id for each query.
-val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
-val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
-val specifiedKafkaParams =
-  parameters
-.keySet
-.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
-.map { k => k.drop(6).toString -> parameters(k) }
-.toMap
-
-val startingStreamOffsets = 
KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
-  STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
-
-val kafkaOffsetReader = new KafkaOffsetReader(
-  strategy(caseInsensitiveParams),
-  kafkaParamsForDriver(specifiedKafkaParams),
-  parameters,
-  driverGroupIdPrefix = s"$uniqueGroupId-driver")
-
-new KafkaMicroBatchReadSupport(
-  kafkaOffsetReader,
-  kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
-  options,
-  metadataPath,
-  startingStreamOffsets,
-  failOnDataLoss(caseInsensitiveParams))
+  override def getTable(options: DataSourceOptions): KafkaTable.type = {
+KafkaTable
   }
 
-  /**
-   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to 
read
-   * Kafka data in a continuous streaming query.
-   */
-  override def createContinuousReadSupport(
-  metadataPath: String,
-  options: DataSourceOptions): KafkaContinuousReadSupport = {
-val parameters = options.asMap().asScala.toMap
-validateStreamOptions(parameters)
-// Each running query should use its own group id. Otherwise, the 
query may be only assigned
-// partial data since Kafka will assign partitions to multiple 
consumers having the same group
-// id. Hence, we should generate a unique id for each query.
-val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
-val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
-val specifiedKafkaParams =
-  parameters
-.keySet
-.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
-.map { k => k.drop(6).toString -> parameters(k) }
-.toMap
+  object KafkaTable extends Table
--- End diff --

Why is `KafkaTable` an object, not a class? This doesn't seem to fit an 
abstraction.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226782371
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala
 ---
@@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType
  *   scenarios, where some offsets after the specified 
initial ones can't be
  *   properly read.
  */
-class KafkaContinuousReadSupport(
+class KafkaContinuousInputStream(
--- End diff --

Is it possible to break this change into multiple PRs for batch, 
microbatch, and continuous? It's really large and it would be nice if we could 
get the changes in incrementally.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226363445
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -381,7 +390,7 @@ class StreamSuite extends StreamTest {
 
   test("insert an extraStrategy") {
 try {
-  spark.experimental.extraStrategies = TestStrategy :: Nil
+  spark.experimental.extraStrategies = CustomStrategy :: Nil
--- End diff --

Since we need to do a temporary planning for streaming queries, we can't 
allow custom strategy to remove streaming leaf nodes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226363020
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -154,21 +159,25 @@ class StreamSuite extends StreamTest {
   }
 
   test("SPARK-20432: union one stream with itself") {
-val df = 
spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
-val unioned = df.union(df)
-withTempDir { outputDir =>
-  withTempDir { checkpointDir =>
-val query =
-  unioned
-.writeStream.format("parquet")
-.option("checkpointLocation", checkpointDir.getAbsolutePath)
-.start(outputDir.getAbsolutePath)
-try {
-  query.processAllAvailable()
-  val outputDf = 
spark.read.parquet(outputDir.getAbsolutePath).as[Long]
-  checkDatasetUnorderly[Long](outputDf, (0L to 10L).union((0L to 
10L)).toArray: _*)
-} finally {
-  query.stop()
+val v1Source = 
spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
+val v2Source = 
spark.readStream.format(classOf[FakeFormat].getName).load().select("a")
+
+Seq(v1Source, v2Source).foreach { df =>
--- End diff --

improve this test to make sure v2 also works.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226361309
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
 ---
@@ -319,29 +307,18 @@ class RateSourceSuite extends StreamTest {
   "rate source does not support user-specified schema"))
   }
 
-  test("continuous in registry") {
--- End diff --

we don't need this test now. With the new `Format` abstraction, the lookup 
logic is unified between microbatch and continuous


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226359031
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchInputStream.scala
 ---
@@ -60,6 +59,14 @@ class RateStreamMicroBatchReadSupport(options: 
DataSourceOptions, checkpointLoca
 s" is $maxSeconds, but 'rampUpTimeSeconds' is $rampUpTimeSeconds.")
   }
 
+  private val numPartitions = {
--- End diff --

moved from 
https://github.com/apache/spark/pull/22547/files#diff-6cd4de793a1c68d3d9415a246823b55eL151


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226355931
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -90,6 +140,8 @@ class ContinuousExecution(
 do {
   runContinuous(sparkSessionForStream)
 } while (state.updateAndGet(stateUpdate) == ACTIVE)
+
+stopSources()
--- End diff --

with the new abstraction, we should only stop sources when the stream query 
ends, instead of each reconfiguration.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226338580
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java 
---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.BatchScan;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * A mix-in interface for {@link Table}. Table implementations can mixin 
this interface to
+ * provide data reading ability for batch processing.
+ */
+@InterfaceStability.Evolving
+public interface SupportsBatchRead extends Table {
+
+  /**
+   * Creates a {@link BatchScan} instance with a {@link ScanConfig} and 
user-specified options.
+   *
+   * @param config a {@link ScanConfig} which may contains operator 
pushdown information.
+   * @param options the user-specified options, which is same as the one 
used to create the
+   *{@link ScanConfigBuilder} that built the given {@link 
ScanConfig}.
--- End diff --

Another choice is to let `ScanConfig` carry the options. But `ScanConfig` 
is an interface and doing this will put more work at user side, so I decided to 
pass the options again here. Feedbacks are welcome!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r220275520
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
 ---
@@ -207,13 +207,13 @@ class KafkaContinuousSourceTopicDeletionSuite extends 
KafkaContinuousTest {
 testUtils.createTopic(topic2, partitions = 5)
 eventually(timeout(streamingTimeout)) {
   assert(
-query.lastExecution.executedPlan.collectFirst {
-  case scan: DataSourceV2ScanExec
-if 
scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
-scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
-}.exists { config =>
+query.lastExecution.logical.collectFirst {
--- End diff --

now the known partitions is tracked by the `KafkaContinuousInputStream` in 
logical plan: 
https://github.com/apache/spark/pull/22547/files#diff-5fa6c9fc023183f4a855f778944d23ebR62


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r220275173
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -106,85 +107,96 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
   failOnDataLoss(caseInsensitiveParams))
   }
 
-  /**
-   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to 
read
-   * batches of Kafka data in a micro-batch streaming query.
-   */
-  override def createMicroBatchReadSupport(
-  metadataPath: String,
-  options: DataSourceOptions): KafkaMicroBatchReadSupport = {
-
-val parameters = options.asMap().asScala.toMap
-validateStreamOptions(parameters)
-// Each running query should use its own group id. Otherwise, the 
query may be only assigned
-// partial data since Kafka will assign partitions to multiple 
consumers having the same group
-// id. Hence, we should generate a unique id for each query.
-val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
-val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
-val specifiedKafkaParams =
-  parameters
-.keySet
-.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
-.map { k => k.drop(6).toString -> parameters(k) }
-.toMap
-
-val startingStreamOffsets = 
KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
-  STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
-
-val kafkaOffsetReader = new KafkaOffsetReader(
-  strategy(caseInsensitiveParams),
-  kafkaParamsForDriver(specifiedKafkaParams),
-  parameters,
-  driverGroupIdPrefix = s"$uniqueGroupId-driver")
-
-new KafkaMicroBatchReadSupport(
-  kafkaOffsetReader,
-  kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
-  options,
-  metadataPath,
-  startingStreamOffsets,
-  failOnDataLoss(caseInsensitiveParams))
+  override def getTable(options: DataSourceOptions): KafkaTable.type = {
+KafkaTable
   }
 
-  /**
-   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to 
read
-   * Kafka data in a continuous streaming query.
-   */
-  override def createContinuousReadSupport(
-  metadataPath: String,
-  options: DataSourceOptions): KafkaContinuousReadSupport = {
-val parameters = options.asMap().asScala.toMap
-validateStreamOptions(parameters)
-// Each running query should use its own group id. Otherwise, the 
query may be only assigned
-// partial data since Kafka will assign partitions to multiple 
consumers having the same group
-// id. Hence, we should generate a unique id for each query.
-val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
-val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
-val specifiedKafkaParams =
-  parameters
-.keySet
-.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
-.map { k => k.drop(6).toString -> parameters(k) }
-.toMap
+  object KafkaTable extends Table
+with SupportsMicroBatchRead with SupportsContinuousRead {
 
-val startingStreamOffsets = 
KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
-  STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
+override def schema(): StructType = KafkaOffsetReader.kafkaSchema
 
-val kafkaOffsetReader = new KafkaOffsetReader(
-  strategy(caseInsensitiveParams),
-  kafkaParamsForDriver(specifiedKafkaParams),
-  parameters,
-  driverGroupIdPrefix = s"$uniqueGroupId-driver")
+/**
+ * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchInputStream]] to 
read
+ * batches of Kafka data in a micro-batch streaming query.
+ */
+override def createMicroBatchInputStream(
+checkpointLocation: String,
+config: ScanConfig,
+options: DataSourceOptions): MicroBatchInputStream = {
+  val parameters = options.asMap().asScala.toMap
+  validateStreamOptions(parameters)
+  // Each running query should use its own group id. Otherwise, the 
query may be only assigned
+  // partial data since Kafka will assign partitions to multiple 
consumers having the same group
+  // id. Hence, we should generate a unique id for each query.
+  val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${che

[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r220274862
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchInputStream.scala
 ---
@@ -294,6 +227,88 @@ private[kafka010] class KafkaMicroBatchReadSupport(
   }
 }
 
+private[kafka010] class KafkaMicroBatchScan(
+kafkaOffsetReader: KafkaOffsetReader,
+rangeCalculator: KafkaOffsetRangeCalculator,
+executorKafkaParams: ju.Map[String, Object],
+pollTimeoutMs: Long,
+failOnDataLoss: Boolean,
+reportDataLoss: String => Unit,
+start: KafkaSourceOffset,
+end: KafkaSourceOffset) extends MicroBatchScan with Logging {
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+KafkaMicroBatchReaderFactory
+  }
+
+  override def planInputPartitions(): Array[InputPartition] = {
+val startPartitionOffsets = start.partitionToOffsets
--- End diff --

moved from 
https://github.com/apache/spark/pull/22547/files#diff-314d02b954fc05ec7ae687dd486a8e84L104


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r220275016
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -106,85 +107,96 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
   failOnDataLoss(caseInsensitiveParams))
   }
 
-  /**
-   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to 
read
-   * batches of Kafka data in a micro-batch streaming query.
-   */
-  override def createMicroBatchReadSupport(
-  metadataPath: String,
-  options: DataSourceOptions): KafkaMicroBatchReadSupport = {
-
-val parameters = options.asMap().asScala.toMap
-validateStreamOptions(parameters)
-// Each running query should use its own group id. Otherwise, the 
query may be only assigned
-// partial data since Kafka will assign partitions to multiple 
consumers having the same group
-// id. Hence, we should generate a unique id for each query.
-val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
-val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
-val specifiedKafkaParams =
-  parameters
-.keySet
-.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
-.map { k => k.drop(6).toString -> parameters(k) }
-.toMap
-
-val startingStreamOffsets = 
KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
-  STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
-
-val kafkaOffsetReader = new KafkaOffsetReader(
-  strategy(caseInsensitiveParams),
-  kafkaParamsForDriver(specifiedKafkaParams),
-  parameters,
-  driverGroupIdPrefix = s"$uniqueGroupId-driver")
-
-new KafkaMicroBatchReadSupport(
-  kafkaOffsetReader,
-  kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
-  options,
-  metadataPath,
-  startingStreamOffsets,
-  failOnDataLoss(caseInsensitiveParams))
+  override def getTable(options: DataSourceOptions): KafkaTable.type = {
+KafkaTable
   }
 
-  /**
-   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to 
read
-   * Kafka data in a continuous streaming query.
-   */
-  override def createContinuousReadSupport(
-  metadataPath: String,
-  options: DataSourceOptions): KafkaContinuousReadSupport = {
-val parameters = options.asMap().asScala.toMap
-validateStreamOptions(parameters)
-// Each running query should use its own group id. Otherwise, the 
query may be only assigned
-// partial data since Kafka will assign partitions to multiple 
consumers having the same group
-// id. Hence, we should generate a unique id for each query.
-val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
-val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
-val specifiedKafkaParams =
-  parameters
-.keySet
-.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
-.map { k => k.drop(6).toString -> parameters(k) }
-.toMap
+  object KafkaTable extends Table
+with SupportsMicroBatchRead with SupportsContinuousRead {
 
-val startingStreamOffsets = 
KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
-  STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
+override def schema(): StructType = KafkaOffsetReader.kafkaSchema
 
-val kafkaOffsetReader = new KafkaOffsetReader(
-  strategy(caseInsensitiveParams),
-  kafkaParamsForDriver(specifiedKafkaParams),
-  parameters,
-  driverGroupIdPrefix = s"$uniqueGroupId-driver")
+/**
+ * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchInputStream]] to 
read
+ * batches of Kafka data in a micro-batch streaming query.
+ */
+override def createMicroBatchInputStream(
+checkpointLocation: String,
+config: ScanConfig,
+options: DataSourceOptions): MicroBatchInputStream = {
+  val parameters = options.asMap().asScala.toMap
--- End diff --

moved from 
https://github.com/apache/spark/pull/22547/files#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05L117


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r220274562
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala
 ---
@@ -67,28 +71,29 @@ class KafkaContinuousReadSupport(
 offsets
   }
 
-  override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema
-
-  override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
-new KafkaContinuousScanConfigBuilder(fullSchema(), start, 
offsetReader, reportDataLoss)
-  }
-
   override def deserializeOffset(json: String): Offset = {
 KafkaSourceOffset(JsonUtils.partitionOffsets(json))
   }
 
-  override def planInputPartitions(config: ScanConfig): 
Array[InputPartition] = {
-val startOffsets = 
config.asInstanceOf[KafkaContinuousScanConfig].startOffsets
-startOffsets.toSeq.map {
-  case (topicPartition, start) =>
-KafkaContinuousInputPartition(
-  topicPartition, start, kafkaParams, pollTimeoutMs, 
failOnDataLoss)
-}.toArray
-  }
+  override def createContinuousScan(start: Offset): ContinuousScan = {
+val oldStartPartitionOffsets = 
KafkaSourceOffset.getPartitionOffsets(start)
--- End diff --

moved from 
https://github.com/apache/spark/pull/22547/files#diff-5fa6c9fc023183f4a855f778944d23ebL162


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-09-25 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/22547

[SPARK-25528][SQL] data source V2 read side API refactoring

## What changes were proposed in this pull request?

Refactor the read side API according to the abstraction proposed in the 
[dev 
list](http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html)

```
batch: catalog -> table -> scan
streaming: catalog -> table -> stream -> scan
```

More concretely, this PR
1. add a new interface called `Format` that can return `Table`
2. rename `ReadSupportProvider` to `Table`, represents a logical data set, 
with a schema.
3. add a new interface `InputStream` to represent a streaming source in a 
streaming query. It can create `Scan`s.
4. rename `ReadSupport` to `Scan`. Each `Scan` triggers one Spark job. 
(like an RDD)

## How was this patch tested?

existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark new-idea

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22547.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22547


commit 92dfdaf990f2676d49766f5ab094e8b8a9a755b1
Author: Wenchen Fan 
Date:   2018-08-27T15:20:08Z

data source V2 read side API refactoring




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org