[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

2018-12-07 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/23208
  
@cloud-fan, what are you suggesting to use as a design? If you think this 
shouldn't mirror the read side, then let's be clear on what it should look 
like. Maybe that's a design doc, or maybe that's a discussion thread on the 
mailing list.

Whatever option we go for, we still need to have a plan for exposing the 
replace-by-filter and replace-dynamic-partitions methods, whatever they end up 
being. We also need the life-cycle to match.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239889152
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,52 +17,49 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.util.UUID
-
-import scala.collection.JavaConverters._
+import java.util.{Optional, UUID}
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
 import org.apache.spark.sql.types.StructType
 
 /**
- * A logical plan representing a data source v2 scan.
+ * A logical plan representing a data source v2 table.
  *
- * @param source An instance of a [[DataSourceV2]] implementation.
- * @param options The options for this scan. Used to create fresh 
[[BatchWriteSupport]].
- * @param userSpecifiedSchema The user-specified schema for this scan.
+ * @param table The table that this relation represents.
+ * @param options The options for this table operation. It's used to 
create fresh [[ScanBuilder]]
+ *and [[BatchWriteSupport]].
  */
 case class DataSourceV2Relation(
-// TODO: remove `source` when we finish API refactor for write.
-source: TableProvider,
-table: SupportsBatchRead,
+table: Table,
 output: Seq[AttributeReference],
-options: Map[String, String],
-userSpecifiedSchema: Option[StructType] = None)
+// TODO: use a simple case insensitive map instead.
+options: DataSourceOptions)
--- End diff --

A private method to do that existed in the past. Why not just revive it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239888975
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java 
---
@@ -25,14 +25,14 @@
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * A mix-in interface for {@link Table}. Data sources can implement this 
interface to
  * provide data writing ability for batch processing.
  *
  * This interface is used to create {@link BatchWriteSupport} instances 
when end users run
  * {@code Dataset.write.format(...).option(...).save()}.
  */
 @Evolving
-public interface BatchWriteSupportProvider extends DataSourceV2 {
+public interface SupportsBatchWrite extends Table {
--- End diff --

I'm fine either way, as long as we are consistent between the read and 
write sides.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239888795
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -25,7 +25,10 @@
  * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
  * have a public, 0-arg constructor.
  * 
- * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write. If you
+ * want to allow end-users to write data to non-existing tables via write 
APIs in `DataFrameWriter`
+ * with `SaveMode`, you must return a {@link Table} instance even if the 
table doesn't exist. The
+ * table schema can be empty in this case.
--- End diff --

I think we can remove SaveMode right away. We don't need to break existing 
use cases if we add the OverwriteData plan and use it when the user's mode is 
Overwrite. That helps us get to the point where we can integrate SQL on top of 
this faster.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239613722
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java 
---
@@ -25,14 +25,14 @@
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * A mix-in interface for {@link Table}. Data sources can implement this 
interface to
  * provide data writing ability for batch processing.
  *
  * This interface is used to create {@link BatchWriteSupport} instances 
when end users run
  * {@code Dataset.write.format(...).option(...).save()}.
  */
 @Evolving
-public interface BatchWriteSupportProvider extends DataSourceV2 {
+public interface SupportsBatchWrite extends Table {
--- End diff --

`Table` exposes `newScanBuilder` without an interface. Why should the write 
side be different? Doesn't Spark support sources that are read-only and 
write-only?

I think that both reads and writes should use interfaces to mix support 
into `Table` or both should be exposed by `Table` and throw 
`UnsupportedOperationException` by default, not a mix of the two options.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239613088
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,52 +17,49 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.util.UUID
-
-import scala.collection.JavaConverters._
+import java.util.{Optional, UUID}
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
 import org.apache.spark.sql.types.StructType
 
 /**
- * A logical plan representing a data source v2 scan.
+ * A logical plan representing a data source v2 table.
  *
- * @param source An instance of a [[DataSourceV2]] implementation.
- * @param options The options for this scan. Used to create fresh 
[[BatchWriteSupport]].
- * @param userSpecifiedSchema The user-specified schema for this scan.
+ * @param table The table that this relation represents.
+ * @param options The options for this table operation. It's used to 
create fresh [[ScanBuilder]]
+ *and [[BatchWriteSupport]].
  */
 case class DataSourceV2Relation(
-// TODO: remove `source` when we finish API refactor for write.
-source: TableProvider,
-table: SupportsBatchRead,
+table: Table,
 output: Seq[AttributeReference],
-options: Map[String, String],
-userSpecifiedSchema: Option[StructType] = None)
+// TODO: use a simple case insensitive map instead.
+options: DataSourceOptions)
--- End diff --

Why change this now, when DataSourceOptions will be replaced? I would say 
just leave it as a map and update it once later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

2018-12-06 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/23208
  
@cloud-fan, I see that this adds `Table` and uses `TableProvider`, but I 
was expecting this to also update the write side to mirror the read side, like 
PR #22190 for [SPARK-25188](https://issues.apache.org/jira/browse/SPARK-25188) 
(originally proposed in [discussion on 
SPARK-24882](https://issues.apache.org/jira/browse/SPARK-24882?focusedCommentId=16581725=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16581725)).

The main parts that we discussed there were:
* Mirror the read side structure by adding WriteConfig. Now, that would be 
adding a WriteBuilder.
* Mirroring the read life-cycle of ScanBuilder and Scan, to enable use 
cases like acquiring and holding a write lock, for example.
* Using the WriteBuilder to expose more write configuration to support 
overwrite and dynamic partition overwrite.

We don't need to add the overwrite mix-ins here, but I would expect to see 
a WriteBuilder that returns a Writer. (`Table -> WriteBuilder -> Write` matches 
`Table -> ScanBulder -> Scan`.)

The Write would expose BatchWrite and StreamWrite (if they are different) 
or could directly expose the WriteFactory, commit, abort, etc.

WriteBuilder would be extensible so that SupportsOverwrite and 
SupportsDynamicOverwrite can be added as mix-ins at some point.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239598346
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
 assertNotBucketed("save")
 
-val cls = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
-if (classOf[DataSourceV2].isAssignableFrom(cls)) {
-  val source = 
cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
-  source match {
-case provider: BatchWriteSupportProvider =>
-  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
-source,
-df.sparkSession.sessionState.conf)
-  val options = sessionOptions ++ extraOptions
-
+val session = df.sparkSession
+val cls = DataSource.lookupDataSource(source, 
session.sessionState.conf)
+if (classOf[TableProvider].isAssignableFrom(cls)) {
+  val provider = 
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
+  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+provider, session.sessionState.conf)
+  val options = sessionOptions ++ extraOptions
+  val dsOptions = new DataSourceOptions(options.asJava)
+  provider.getTable(dsOptions) match {
+case table: SupportsBatchWrite =>
+  val relation = DataSourceV2Relation.create(table, dsOptions)
+  // TODO: revisit it. We should not create the `AppendData` 
operator for `SaveMode.Append`.
+  // We should create new end-users APIs for the `AppendData` 
operator.
--- End diff --

Here is what my branch uses for this logic:

```scala
val maybeTable = provider.getTable(identifier)
val exists = maybeTable.isDefined
(exists, mode) match {
  case (true, SaveMode.ErrorIfExists) =>
throw new AnalysisException(s"Table already exists: 
${identifier.quotedString}")

  case (true, SaveMode.Overwrite) =>
val relation = DataSourceV2Relation.create(
  catalog.name, identifier, maybeTable.get, options)

runCommand(df.sparkSession, "insertInto") {
  OverwritePartitionsDynamic.byName(relation, df.logicalPlan)
}

  case (true, SaveMode.Append) =>
val relation = DataSourceV2Relation.create(
  catalog.name, identifier, maybeTable.get, options)

runCommand(df.sparkSession, "save") {
  AppendData.byName(relation, df.logicalPlan)
}

  case (false, SaveMode.Append) |
   (false, SaveMode.ErrorIfExists) |
   (false, SaveMode.Ignore) |
   (false, SaveMode.Overwrite) =>

runCommand(df.sparkSession, "save") {
  CreateTableAsSelect(catalog, identifier, Seq.empty, 
df.logicalPlan, options,
ignoreIfExists = mode == SaveMode.Ignore)
}

  case _ =>
  // table exists and mode is ignore
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239596456
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
 assertNotBucketed("save")
 
-val cls = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
-if (classOf[DataSourceV2].isAssignableFrom(cls)) {
-  val source = 
cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
-  source match {
-case provider: BatchWriteSupportProvider =>
-  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
-source,
-df.sparkSession.sessionState.conf)
-  val options = sessionOptions ++ extraOptions
-
+val session = df.sparkSession
+val cls = DataSource.lookupDataSource(source, 
session.sessionState.conf)
+if (classOf[TableProvider].isAssignableFrom(cls)) {
+  val provider = 
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
+  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+provider, session.sessionState.conf)
+  val options = sessionOptions ++ extraOptions
+  val dsOptions = new DataSourceOptions(options.asJava)
+  provider.getTable(dsOptions) match {
+case table: SupportsBatchWrite =>
+  val relation = DataSourceV2Relation.create(table, dsOptions)
+  // TODO: revisit it. We should not create the `AppendData` 
operator for `SaveMode.Append`.
+  // We should create new end-users APIs for the `AppendData` 
operator.
--- End diff --

The example in the referenced comment is this:

```
spark.range(1).format("source").write.save("non-existent-path")
```

If a path for a path-based table doesn't exist, then I think that the table 
doesn't exist. If a table doesn't exist, then the operation for `save` should 
be CTAS instead of AppendData.

Here, I think the right behavior is to check whether the provider returns a 
table. If it doesn't, then the table doesn't exist and the plan should be CTAS. 
If it does, then it must provide the schema used to validate the AppendData 
operation. Since we don't currently have CTAS, this should throw an exception 
stating that the table doesn't exist and can't be created.

More generally, the meaning of SaveMode with v1 is not always reliable. I 
think the right approach is what @cloud-fan suggests: create a new write API 
for v2 tables that is clear for the new logical plans (I've proposed one and 
would be happy to open a PR). Once the logical plans are in place, we can go 
back through this API and move it over to v2 where the behaviors match.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239581374
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -25,7 +25,10 @@
  * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
  * have a public, 0-arg constructor.
  * 
- * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write. If you
+ * want to allow end-users to write data to non-existing tables via write 
APIs in `DataFrameWriter`
+ * with `SaveMode`, you must return a {@link Table} instance even if the 
table doesn't exist. The
+ * table schema can be empty in this case.
--- End diff --

Maybe it should also be part of the `TableProvider` contract that if the 
table can't be located, it throws an exception?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239578059
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -25,7 +25,10 @@
  * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
  * have a public, 0-arg constructor.
  * 
- * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write. If you
+ * want to allow end-users to write data to non-existing tables via write 
APIs in `DataFrameWriter`
+ * with `SaveMode`, you must return a {@link Table} instance even if the 
table doesn't exist. The
+ * table schema can be empty in this case.
--- End diff --

@jose-torres, create on write is done by CTAS. It should not be left up to 
the source whether to fail or create.

I think the confusion here is that this is a degenerate case where Spark 
has no ability to interact with the table's metadata. Spark must assume that it 
exists because the caller is writing to it.

The caller is indicating that a table exists, is identified by some 
configuration, and that a specific implementation can be used to write to it. 
That's what happens today when source implementations are directly specified.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

2018-12-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23208#discussion_r239559037
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -25,7 +25,10 @@
  * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
  * have a public, 0-arg constructor.
  * 
- * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write. If you
+ * want to allow end-users to write data to non-existing tables via write 
APIs in `DataFrameWriter`
+ * with `SaveMode`, you must return a {@link Table} instance even if the 
table doesn't exist. The
+ * table schema can be empty in this case.
--- End diff --

What does it mean to write to a non-existing table? If you're writing 
somewhere, the table must exist.

This is for creating a table directly from configuration and an 
implementation class in the DataFrameWriter API. The target of the write still 
needs to exist.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23055: [SPARK-26080][PYTHON] Skips Python resource limit on Win...

2018-12-03 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/23055
  
@HyukjinKwon, for the future, I should note that I'm not a committer so my 
+1 for a PR is not binding. I'm fairly sure @vanzin would +1 this commit as 
well, but it's best not to merge based on my approval.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

2018-12-03 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/23208
  
Thanks for posting this PR @cloud-fan! I'll have a look in the next day or 
so.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23055: [SPARK-26080][PYTHON] Skips Python resource limit on Win...

2018-11-30 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/23055
  
+1 with the latest changes. Thanks for taking care of this, @HyukjinKwon!

Functionality is in two parts: changing the resource requests (which 
doesn't change) and limiting memory use in python.

It is too bad that this broke, but I'm not sure how to deal with a platform 
that, as you say, has few contributors. I certainly wouldn't want to gate a 
feature like this on making sure someone tested it in Windows, unless we have 
CI set up for Windows builds.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-11-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r238046730
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransforms.java
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+/**
+ * A standard set of transformations that are passed to data sources 
during table creation.
+ *
+ * @see PartitionTransform
+ */
+public class PartitionTransforms {
+  private PartitionTransforms() {
+  }
+
+  /**
+   * Create a transform for a column with the given name.
+   * 
+   * This transform is used to pass named transforms that are not known to 
Spark.
+   *
+   * @param transform a name of the transform to apply to the column
+   * @param colName a column name
+   * @return an Apply transform for the column
+   */
+  public static PartitionTransform apply(String transform, String colName) 
{
+if ("identity".equals(transform)) {
--- End diff --

I think we should get this done now. Partition transforms are a 
generalization of Hive partitioning (which uses some columns directly) and 
bucketing (which is one specific transform). If we add transformation functions 
now, we will support both of those with a simple API instead of building in 
special cases for identity and bucket transforms.

I also have a data source that allows users to configure partitioning using 
more transforms than just identity and bucketing, so I'd like to get this in so 
that DDL for those tables works.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r237995405
  
--- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java 
---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
--- End diff --

I just went to make this change, but it requires moving any SQL class from 
catalyst referenced by the API into the API module as well... Let's discuss the 
options more on the dev list thread.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-11-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r237984050
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface TableCatalog extends CatalogProvider {
--- End diff --

What about `CatalogPlugin`? I'm hesitant to go with just `Catalog` because 
it isn't very specific. I think it might cause confusion because the interface 
has only the `initialize` method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...

2018-11-30 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21306
  
@stczwd, I agree with @mccheah. Tables are basically named data sets. 
Whether they support batch, micro-batch streaming, or continuous streaming is 
determined by checking whether they implement SupportsBatchScan or similar 
interfaces. Matt's referenced docs are the right place to go for more context. 
The purpose here is to make catalogs and reads orthogonal. A catalog can return 
both batch-compatible and stream-compatible source "tables".

A "table" may be a Kafka topic or may be a file-based data source. And note 
that both of those can support batch and streaming execution. A Kafka topic 
could be CDC stream that represents a table, and a file-based source could be 
streamed by periodically checking for new committed files.

This PR is based on an 
[SPIP](https://docs.google.com/document/d/1zLFiA1VuaWeVxeTDXNg8bL6GP3BVoOZBkewFtEnjEoo/edit#heading=h.7vhjx9226jbt).
 That has some background for why I chose the set of table attributes here 
(schema, partitioning, properties), but a short summary is that those are the 
core set of attributes that are  used in comparable SQL variants and already 
used in Spark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-11-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r237975013
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogProvider.java 
---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.internal.SQLConf;
+
+/**
+ * A marker interface to provide a catalog implementation for Spark.
+ * 
+ * Implementations can provide catalog functions by implementing 
additional interfaces, like
+ * {@link TableCatalog} to expose table operations.
+ * 
+ * Catalog implementations must implement this marker interface to be 
loaded by
+ * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate 
catalog classes using the
+ * required public no-arg constructor. After creating an instance, it will 
be configured by calling
+ * {@link #initialize(CaseInsensitiveStringMap)}.
+ * 
+ * Catalog implementations are registered to a name by adding a 
configuration option to Spark:
+ * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. 
All configuration properties
+ * in the Spark configuration that share the catalog name prefix,
+ * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in 
the case insensitive
+ * string map of options in initialization with the prefix removed. An 
additional property,
+ * {@code name}, is also added to the options and will contain the 
catalog's name; in this case,
+ * "catalog-name".
+ */
+public interface CatalogProvider {
--- End diff --

@cloud-fan, do you want me to create the `sql-api` package in this PR, or 
do you want to add a separate PR to move the current v2 API?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-11-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r237974718
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransforms.java
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+/**
+ * A standard set of transformations that are passed to data sources 
during table creation.
+ *
+ * @see PartitionTransform
+ */
+public class PartitionTransforms {
+  private PartitionTransforms() {
+  }
+
+  /**
+   * Create a transform for a column with the given name.
+   * 
+   * This transform is used to pass named transforms that are not known to 
Spark.
+   *
+   * @param transform a name of the transform to apply to the column
+   * @param colName a column name
+   * @return an Apply transform for the column
+   */
+  public static PartitionTransform apply(String transform, String colName) 
{
+if ("identity".equals(transform)) {
--- End diff --

I should note that the generic function application will probably look like 
the `Apply` case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-11-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r237974410
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents table metadata from a {@link TableCatalog} or other table 
sources.
--- End diff --

No, this interface carries minimal set of operations needed to implement 
the v2 logical plans. We can expand it later when we need to.

The goal here is to build a replacement catalog API incrementally and to 
avoid requiring all catalogs to implement all possible catalog features. This 
API is focused on table operations, not view or function operations that we 
have yet to define.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-11-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r237973548
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1MetadataTable.scala 
---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.catalog.v2.PartitionTransforms.{bucket, 
identity}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
+
+/**
+ * An implementation of catalog v2 [[Table]] to expose v1 table metadata.
+ */
+private[sql] class V1MetadataTable(
+v1Table: CatalogTable,
+v2Source: Option[DataSourceV2]) extends Table {
+
+  def readDelegate: ReadSupport = v2Source match {
+case r: ReadSupport => r
+case _ => throw new UnsupportedOperationException(s"Source does not 
support reads: $v2Source")
+  }
+
+  def writeDelegate: WriteSupport = v2Source match {
+case w: WriteSupport => w
+case _ => throw new UnsupportedOperationException(s"Source does not 
support writes: $v2Source")
+  }
+
+  lazy val options: Map[String, String] = {
+v1Table.storage.locationUri match {
--- End diff --

I use lazy for a couple reasons. First, to avoid building maps or other 
data values that are never used. Second, to avoid a required ordering for 
fields. If fields depend on one another, then they have to be reordered when 
those dependencies change. Lazy values never require reordering.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-11-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r237972742
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1MetadataTable.scala 
---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.catalog.v2.PartitionTransforms.{bucket, 
identity}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
+
+/**
+ * An implementation of catalog v2 [[Table]] to expose v1 table metadata.
+ */
+private[sql] class V1MetadataTable(
+v1Table: CatalogTable,
+v2Source: Option[DataSourceV2]) extends Table {
+
+  def readDelegate: ReadSupport = v2Source match {
+case r: ReadSupport => r
+case _ => throw new UnsupportedOperationException(s"Source does not 
support reads: $v2Source")
+  }
+
+  def writeDelegate: WriteSupport = v2Source match {
+case w: WriteSupport => w
+case _ => throw new UnsupportedOperationException(s"Source does not 
support writes: $v2Source")
+  }
+
+  lazy val options: Map[String, String] = {
+v1Table.storage.locationUri match {
--- End diff --

How would the `getOrElse` pattern work here? If the URI is undefined, what 
tuple should be added to the table properties?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-11-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r237972182
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface TableCatalog extends CatalogProvider {
--- End diff --

The intent is to use some interface to load all catalogs, whether they 
implement `TableCatalog`, `FunctionCatalog`, or both (or other catalog API 
parts). So you load a catalog, then check whether it is a `TableCatalog` when 
you want to use it for tables.

Sounds like the name `CatalogProvider` is the confusing part. You're right 
that a provider usually implements a get method to provide something. I could 
change that to `CatalogImpl` or something. Would that work?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-11-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r237971241
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents table metadata from a {@link TableCatalog} or other table 
sources.
+ */
+public interface Table {
+  /**
+   * Return the table properties.
+   * @return this table's map of string properties
+   */
+  Map properties();
--- End diff --

Yeah, that works.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-11-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r237971288
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents table metadata from a {@link TableCatalog} or other table 
sources.
+ */
+public interface Table {
+  /**
+   * Return the table properties.
+   * @return this table's map of string properties
+   */
+  Map properties();
+
+  /**
+   * Return the table schema.
+   * @return this table's schema as a struct type
+   */
+  StructType schema();
+
+  /**
+   * Return the table partitioning transforms.
+   * @return this table's partitioning transforms
+   */
+  List partitioning();
--- End diff --

Sure


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-11-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r237971092
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransforms.java
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+/**
+ * A standard set of transformations that are passed to data sources 
during table creation.
+ *
+ * @see PartitionTransform
+ */
+public class PartitionTransforms {
+  private PartitionTransforms() {
+  }
+
+  /**
+   * Create a transform for a column with the given name.
+   * 
+   * This transform is used to pass named transforms that are not known to 
Spark.
+   *
+   * @param transform a name of the transform to apply to the column
+   * @param colName a column name
+   * @return an Apply transform for the column
+   */
+  public static PartitionTransform apply(String transform, String colName) 
{
+if ("identity".equals(transform)) {
--- End diff --

What I wanted to discuss on Wednesday was how to pass these transforms. 
@rxin and I had some discussions about it on the dev list, but we didn't come 
up with a decision. I think the solution will probably be to add way to pass 
generic function application and a list of arguments that are either columns or 
constant literals.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23086: [SPARK-25528][SQL] data source v2 API refactor (batch re...

2018-11-30 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/23086
  
@cloud-fan, thanks for getting this done! I'll wait for the equivalent 
write-side PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23086: [SPARK-25528][SQL] data source v2 API refactor (batch re...

2018-11-30 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/23086
  
> I still do not think we should mix the catalog support with the data 
source APIs

We are trying to keep these separate. `Table` is the only overlap between 
the two. If you prefer more separation, we could move the `newScanBuilder` 
method to a separate interface that readable data sources implement.

> Catalog is a well-defined concept in database systems, as what Spark SQL 
follows. The so-called "table catalog" is not a catalog to me.

I'm glad that you're interested in joining the discussion on multi-catalog 
support. Let's have that discussion on the catalog issues or discussion threads 
on the dev list, not here on an update to the read API.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r237966188
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.sources.v2.SupportsBatchRead;
+import org.apache.spark.sql.sources.v2.Table;
+
+/**
+ * A logical representation of a data source scan. This interface is used 
to provide logical
+ * information, like what the actual read schema is.
+ * 
+ * This logical representation is shared between batch scan, micro-batch 
streaming scan and
+ * continuous streaming scan. Data sources must implement the 
corresponding methods in this
+ * interface, to match what the table promises to support. For example, 
{@link #toBatch()} must be
+ * implemented, if the {@link Table} that creates this {@link Scan} 
implements
+ * {@link SupportsBatchRead}.
+ * 
+ */
+@Evolving
+public interface Scan {
+
+  /**
+   * Returns the actual schema of this data source scan, which may be 
different from the physical
+   * schema of the underlying storage, as column pruning or other 
optimizations may happen.
+   */
+  StructType readSchema();
+
+  /**
+   * A description string of this scan, which may includes information 
like: what filters are
+   * configured for this scan, what's the value of some important options 
like path, etc. The
+   * description doesn't need to include {@link #readSchema()}, as Spark 
already knows it.
+   * 
+   * By default this returns the class name of the implementation. Please 
override it to provide a
+   * meaningful description.
+   * 
+   */
+  default String description() {
--- End diff --

What about adding `pushedFilters` that defaults to `new Filter[0]`? Then 
users should override that to add filters to the description, if they are 
pushed. I think a Scan should be able to report its options, especially those 
that distinguish it from other scans, like pushed filters.

I guess we could have some wrapper around the user-provided Scan that holds 
the Scan options. I would want to standardize that instead of doing it in every 
scan exec node.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23055: [SPARK-26080][PYTHON] Skips Python resource limit on Win...

2018-11-30 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/23055
  
+1 once the docs are updated to note that resource requests still include 
python memory, even in Windows.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Skips Python resource limit...

2018-11-30 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23055#discussion_r237963488
  
--- Diff: docs/configuration.md ---
@@ -190,6 +190,8 @@ of the most common options to set are:
 and it is up to the application to avoid exceeding the overhead memory 
space
 shared with other non-JVM processes. When PySpark is run in YARN or 
Kubernetes, this memory
 is added to executor resource requests.
+
+NOTE: This configuration is not supported on Windows.
--- End diff --

I agree. A better note would be something like "Python memory usage may not 
be limited on platforms that do not support resource limiting, such as Windows".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21978: [SPARK-25006][SQL] Add CatalogTableIdentifier.

2018-11-29 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21978
  
Rebased on master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23086: [SPARK-25528][SQL] data source v2 API refactor (batch re...

2018-11-29 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/23086
  
+1

There are only minor suggestions left from me. I'd like to see the default 
implementation of `Table.name` removed, but I don't think that should block 
committing this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-29 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r237670228
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -22,86 +22,56 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical
 import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
-import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.sources.v2.DataSourceV2
 import org.apache.spark.sql.sources.v2.reader._
-import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory,
 ContinuousReadSupport, MicroBatchReadSupport}
 
 /**
- * Physical plan node for scanning data from a data source.
+ * Physical plan node for scanning a batch of data from a data source.
  */
 case class DataSourceV2ScanExec(
 output: Seq[AttributeReference],
-@transient source: DataSourceV2,
-@transient options: Map[String, String],
-@transient pushedFilters: Seq[Expression],
-@transient readSupport: ReadSupport,
-@transient scanConfig: ScanConfig)
-  extends LeafExecNode with DataSourceV2StringFormat with 
ColumnarBatchScan {
+scanDesc: String,
+@transient batch: Batch)
--- End diff --

Sounds good to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-29 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r237670099
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.sources.v2.SupportsBatchRead;
+import org.apache.spark.sql.sources.v2.Table;
+
+/**
+ * A logical representation of a data source scan. This interface is used 
to provide logical
+ * information, like what the actual read schema is.
+ * 
+ * This logical representation is shared between batch scan, micro-batch 
streaming scan and
+ * continuous streaming scan. Data sources must implement the 
corresponding methods in this
+ * interface, to match what the table promises to support. For example, 
{@link #toBatch()} must be
+ * implemented, if the {@link Table} that creates this {@link Scan} 
implements
+ * {@link SupportsBatchRead}.
+ * 
+ */
+@Evolving
+public interface Scan {
+
+  /**
+   * Returns the actual schema of this data source scan, which may be 
different from the physical
+   * schema of the underlying storage, as column pruning or other 
optimizations may happen.
+   */
+  StructType readSchema();
+
+  /**
+   * A description string of this scan, which may includes information 
like: what filters are
+   * configured for this scan, what's the value of some important options 
like path, etc. The
+   * description doesn't need to include {@link #readSchema()}, as Spark 
already knows it.
+   * 
+   * By default this returns the class name of the implementation. Please 
override it to provide a
+   * meaningful description.
+   * 
+   */
+  default String description() {
--- End diff --

I would have expected the default implementation to show both pushed 
filters and the read schema, along with the implementation class name. Read 
schema can be accessed by `readSchema`. Should there also be a way to access 
the pushed filters? `pushedFilters` seems like a good idea to me. (This can be 
added later)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-29 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r237668483
  
--- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java 
---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.reader.Scan;
+import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a logical structured data set of a data 
source. For example, the
+ * implementation can be a directory on the file system, a topic of Kafka, 
or a table in the
+ * catalog, etc.
+ * 
+ * This interface can mixin the following interfaces to support different 
operations:
+ * 
+ * 
+ *   {@link SupportsBatchRead}: this table can be read in batch 
queries.
+ * 
+ */
+@Evolving
+public interface Table {
+
+  /**
+   * A name to identify this table.
+   * 
+   * By default this returns the class name of this implementation. Please 
override it to provide a
+   * meaningful name, like the database and table name from catalog, or 
the location of files for
+   * this table.
+   * 
+   */
+  default String name() {
--- End diff --

I don't think this should have a default. Implementations should definitely 
implement this.

I think there is a difference between `toString` and `name`. An 
implementation may choose to display `name` when showing a table's string 
representation, but may choose to include extra information to show more about 
the table state, like Iceberg's snapshot ID.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21978: [SPARK-25006][SQL] Add CatalogTableIdentifier.

2018-11-29 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21978#discussion_r237660050
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala ---
@@ -18,48 +18,106 @@
 package org.apache.spark.sql.catalyst
 
 /**
- * An identifier that optionally specifies a database.
+ * An identifier that optionally specifies a database and catalog.
  *
  * Format (unquoted): "name" or "db.name"
  * Format (quoted): "`name`" or "`db`.`name`"
  */
-sealed trait IdentifierWithDatabase {
+sealed trait IdentifierWithOptionalDatabaseAndCatalog {
   val identifier: String
 
   def database: Option[String]
 
+  def catalog: Option[String]
+
   /*
* Escapes back-ticks within the identifier name with double-back-ticks.
*/
   private def quoteIdentifier(name: String): String = name.replace("`", 
"``")
 
   def quotedString: String = {
-val replacedId = quoteIdentifier(identifier)
-val replacedDb = database.map(quoteIdentifier(_))
-
-if (replacedDb.isDefined) s"`${replacedDb.get}`.`$replacedId`" else 
s"`$replacedId`"
+// database is required if catalog is present
+assert(database.isDefined || catalog.isEmpty)
+def q(s: String): String = s"`${quoteIdentifier(s)}`"
+Seq(catalog.map(q), database.map(q), 
Some(q(identifier))).flatten.mkString(".")
   }
 
   def unquotedString: String = {
-if (database.isDefined) s"${database.get}.$identifier" else identifier
+Seq(catalog, database, Some(identifier)).flatten.mkString(".")
   }
 
   override def toString: String = quotedString
 }
 
 
+object CatalogTableIdentifier {
+  def apply(table: String): CatalogTableIdentifier =
+new CatalogTableIdentifier(table, None, None)
+
+  def apply(table: String, database: String): CatalogTableIdentifier =
+new CatalogTableIdentifier(table, Some(database), None)
+
+  def apply(table: String, database: String, catalog: String): 
CatalogTableIdentifier =
+new CatalogTableIdentifier(table, Some(database), Some(catalog))
+}
+
 /**
- * Identifies a table in a database.
- * If `database` is not defined, the current database is used.
- * When we register a permanent function in the FunctionRegistry, we use
- * unquotedString as the function name.
+ * Identifies a table in a database and catalog.
+ * If `database` is not defined, the current catalog's default database is 
used.
+ * If `catalog` is not defined, the current catalog is used.
--- End diff --

Agreed. This introduces the ability to expose a catalog to Spark. It 
doesn't actually add any user-facing operations.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21978: [SPARK-25006][SQL] Add CatalogTableIdentifier.

2018-11-29 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21978#discussion_r237585203
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala ---
@@ -18,48 +18,106 @@
 package org.apache.spark.sql.catalyst
 
 /**
- * An identifier that optionally specifies a database.
+ * An identifier that optionally specifies a database and catalog.
  *
  * Format (unquoted): "name" or "db.name"
  * Format (quoted): "`name`" or "`db`.`name`"
  */
-sealed trait IdentifierWithDatabase {
+sealed trait IdentifierWithOptionalDatabaseAndCatalog {
   val identifier: String
 
   def database: Option[String]
 
+  def catalog: Option[String]
+
   /*
* Escapes back-ticks within the identifier name with double-back-ticks.
*/
   private def quoteIdentifier(name: String): String = name.replace("`", 
"``")
 
   def quotedString: String = {
-val replacedId = quoteIdentifier(identifier)
-val replacedDb = database.map(quoteIdentifier(_))
-
-if (replacedDb.isDefined) s"`${replacedDb.get}`.`$replacedId`" else 
s"`$replacedId`"
+// database is required if catalog is present
+assert(database.isDefined || catalog.isEmpty)
+def q(s: String): String = s"`${quoteIdentifier(s)}`"
+Seq(catalog.map(q), database.map(q), 
Some(q(identifier))).flatten.mkString(".")
   }
 
   def unquotedString: String = {
-if (database.isDefined) s"${database.get}.$identifier" else identifier
+Seq(catalog, database, Some(identifier)).flatten.mkString(".")
   }
 
   override def toString: String = quotedString
 }
 
 
+object CatalogTableIdentifier {
+  def apply(table: String): CatalogTableIdentifier =
+new CatalogTableIdentifier(table, None, None)
+
+  def apply(table: String, database: String): CatalogTableIdentifier =
+new CatalogTableIdentifier(table, Some(database), None)
+
+  def apply(table: String, database: String, catalog: String): 
CatalogTableIdentifier =
+new CatalogTableIdentifier(table, Some(database), Some(catalog))
+}
+
 /**
- * Identifies a table in a database.
- * If `database` is not defined, the current database is used.
- * When we register a permanent function in the FunctionRegistry, we use
- * unquotedString as the function name.
+ * Identifies a table in a database and catalog.
+ * If `database` is not defined, the current catalog's default database is 
used.
+ * If `catalog` is not defined, the current catalog is used.
--- End diff --

No, we want to move away from a special global catalog. I think that Spark 
should have a current catalog, like a current database, which is used to 
resolve references that don't have an explicit catalog. That would have a 
default, just like the current database has a default.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...

2018-11-29 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21306
  
@stczwd, thanks for taking a look at this. What are the differences between 
batch and stream DDL that you think will come up?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r237179854
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -54,27 +53,17 @@ case class DataSourceV2ScanExec(
 Seq(output, source, options).hashCode()
   }
 
-  override def outputPartitioning: physical.Partitioning = readSupport 
match {
+  override def outputPartitioning: physical.Partitioning = scan match {
--- End diff --

If you take my suggesting above to inspect the `Scan` to build the string 
representation of this node, then I think the arguments should be `scan` and 
`output`. Then the batch can be fetched here.

For pushedFilters, I think that they should be fetched from the configured 
scan to build the string representation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r237178976
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -23,29 +23,28 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical
 import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
 import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
-import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.sources.v2.DataSourceV2
 import org.apache.spark.sql.sources.v2.reader._
-import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory,
 ContinuousReadSupport, MicroBatchReadSupport}
 
 /**
- * Physical plan node for scanning data from a data source.
+ * Physical plan node for scanning a batch of data from a data source.
  */
 case class DataSourceV2ScanExec(
 output: Seq[AttributeReference],
 @transient source: DataSourceV2,
 @transient options: Map[String, String],
--- End diff --

With a catalog, there is no expectation that a `source` will be passed. 
This could be a string that identifies either the source or the catalog, for a 
good string representation of the physical plan. This is another area where I 
think `Table.name` would be helpful because the table's identifying information 
is really what should be shown instead of its source or catalog.

For options, these are part of the scan and aren't used to affect the 
behavior of this physical node. I think that means that they shouldn't be part 
of the node's arguments.

I think a good way to solve this problem is to change the pretty string 
format to use `Scan` instead. That has the information that defines what this 
node is doing, like the filters, projection, and options. And being able to 
convert a logical scan to text would be useful across all 3 execution modes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r237176552
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
 import 
org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
-import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, 
DataSourceOptions, DataSourceV2}
+import org.apache.spark.sql.sources.v2._
--- End diff --

I am using an IDE for this review, but this makes future reviews harder. I 
realize it isn't a major issue, but I think it is a best practice to not use 
wildcard imports.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r237176100
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
+ * have a public, 0-arg constructor.
+ *
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ */
+@Evolving
+// TODO: do not extend `DataSourceV2`, after we finish the API refactor 
completely.
+public interface TableProvider extends DataSourceV2 {
+
+  /**
+   * Return a {@link Table} instance to do read/write with user-specified 
options.
+   *
+   * @param options the user-specified options that can identify a table, 
e.g. file path, Kafka
+   *topic name, etc. It's an immutable case-insensitive 
string-to-string map.
+   */
+  Table getTable(DataSourceOptions options);
+
+  /**
+   * Return a {@link Table} instance to do read/write with user-specified 
schema and options.
+   *
+   * By default this method throws {@link UnsupportedOperationException}, 
implementations should
--- End diff --

Strange, that page links to one with the opposite advice: 
http://www.javapractices.com/topic/TopicAction.do?Id=44

I think that `@throws` is a good idea whenever you want to document an 
exception type as part of the method contract. Since it is expected that this 
method isn't always implemented and may throw this exception, I think you were 
right to document it. And documenting exceptions is best done with `@throws` to 
highlight them in Javadoc.

The page you linked to makes the argument that unchecked exceptions aren't 
part of the method contract and cannot be relied on. But documenting this shows 
that it is part of the contract or expected behavior, so I think docs are 
appropriate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r237172065
  
--- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java 
---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
--- End diff --

> Can we delay the discussion when we have a PR to add catalog support 
after the refactor?

Yes, that works.

But, can we move `Table` to the `org.apache.spark.sql.catalog.v2` package 
where `TableCatalog` is defined in the other PR? I think `Table` should be 
defined with the catalog API and moving that later would require import changes 
to any file that references `Table`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...

2018-11-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23055#discussion_r237169532
  
--- Diff: python/pyspark/worker.py ---
@@ -22,7 +22,12 @@
 import os
 import sys
 import time
-import resource
+# 'resource' is a Unix specific module.
+has_resource_module = True
+try:
+import resource
+except ImportError:
+has_resource_module = False
--- End diff --

I'm -1 on this change.

I think the correct behavior is that Python should not fail if resource 
cannot be imported, and the JVM should not do anything differently.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23086: [SPARK-25528][SQL] data source v2 API refactor (batch re...

2018-11-27 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/23086
  
@cloud-fan, sorry to spread review comments over two days, but I've 
finished the first pass. Overall, it looks great.

I think we can simplify a couple of areas, like all of the args passed to 
the ScanExec node and its equals method. I'd also like to add `name` to table 
to return an identifying string (even if that is a set of options or paths in 
some cases).

Thanks for working on this!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236859358
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala 
---
@@ -396,87 +392,66 @@ object SimpleReaderFactory extends 
PartitionReaderFactory {
   }
 }
 
-abstract class SimpleReadSupport extends BatchReadSupport {
-  override def fullSchema(): StructType = new StructType().add("i", 
"int").add("j", "int")
-
-  override def newScanConfigBuilder(): ScanConfigBuilder = {
-NoopScanConfigBuilder(fullSchema())
-  }
+abstract class SimpleBatchTable extends Table with SupportsBatchRead  {
 
-  override def createReaderFactory(config: ScanConfig): 
PartitionReaderFactory = {
-SimpleReaderFactory
-  }
+  override def schema(): StructType = new StructType().add("i", 
"int").add("j", "int")
 }
 
+abstract class SimpleScanBuilder extends ScanBuilder
+  with Batch with Scan {
--- End diff --

I like that the API is flexible enough that `ScanBuilder`, `Scan`, and 
`Batch` can be the same object in simple cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236858793
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -116,16 +116,20 @@ object DataSourceV2Strategy extends Strategy {
|Output: ${output.mkString(", ")}
  """.stripMargin)
 
-  val scan = DataSourceV2ScanExec(
+  val batch = scan.toBatch
+  val partitions = batch.planInputPartitions()
+  val readerFactory = batch.createReaderFactory()
+  val plan = DataSourceV2ScanExec(
--- End diff --

I mentioned this above, but I think that DataSoruceV2ScanExec only needs to 
be passed `output` and `batch`. That is, unless there is a benefit to calling 
`planInputPartitions` here, like an earlier failure?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236858449
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -54,27 +53,17 @@ case class DataSourceV2ScanExec(
 Seq(output, source, options).hashCode()
   }
 
-  override def outputPartitioning: physical.Partitioning = readSupport 
match {
+  override def outputPartitioning: physical.Partitioning = scan match {
--- End diff --

Should `SupportsReportPartitioning` extend `Batch` instead of `Scan`? Then 
this physical node could just be passed the `Batch` and not the `Scan`, 
`PartitionReaderFactory`, and partitions.

In fact, I think that this node only requires `output: 
Seq[AttributeReference], batch: Batch`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236858107
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -23,29 +23,28 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical
 import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
 import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
-import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.sources.v2.DataSourceV2
 import org.apache.spark.sql.sources.v2.reader._
-import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory,
 ContinuousReadSupport, MicroBatchReadSupport}
 
 /**
- * Physical plan node for scanning data from a data source.
+ * Physical plan node for scanning a batch of data from a data source.
  */
 case class DataSourceV2ScanExec(
 output: Seq[AttributeReference],
 @transient source: DataSourceV2,
 @transient options: Map[String, String],
--- End diff --

Similarly, options were used to create the `Scan` so they don't need to be 
passed here if they are not used in `equals` and `hashCode`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236857220
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -23,29 +23,28 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical
 import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
 import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
-import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.sources.v2.DataSourceV2
 import org.apache.spark.sql.sources.v2.reader._
-import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory,
 ContinuousReadSupport, MicroBatchReadSupport}
 
 /**
- * Physical plan node for scanning data from a data source.
+ * Physical plan node for scanning a batch of data from a data source.
  */
 case class DataSourceV2ScanExec(
 output: Seq[AttributeReference],
 @transient source: DataSourceV2,
--- End diff --

I think we can remove source by updating `equals` and `hashCode` to check 
just the `Scan`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236856960
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -23,29 +23,28 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical
 import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
 import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
-import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.sources.v2.DataSourceV2
 import org.apache.spark.sql.sources.v2.reader._
-import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory,
 ContinuousReadSupport, MicroBatchReadSupport}
 
 /**
- * Physical plan node for scanning data from a data source.
+ * Physical plan node for scanning a batch of data from a data source.
  */
 case class DataSourceV2ScanExec(
 output: Seq[AttributeReference],
 @transient source: DataSourceV2,
 @transient options: Map[String, String],
 @transient pushedFilters: Seq[Expression],
-@transient readSupport: ReadSupport,
-@transient scanConfig: ScanConfig)
+@transient scan: Scan,
+@transient partitions: Array[InputPartition],
+@transient readerFactory: PartitionReaderFactory)
   extends LeafExecNode with DataSourceV2StringFormat with 
ColumnarBatchScan {
 
   override def simpleString: String = "ScanV2 " + metadataString
 
   // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
   override def equals(other: Any): Boolean = other match {
-case other: DataSourceV2ScanExec =>
-  output == other.output && readSupport.getClass == 
other.readSupport.getClass &&
+case other: DataSourceV2StreamingScanExec =>
+  output == other.output && source.getClass == other.source.getClass &&
--- End diff --

Should this implement identity instead of equality? When would two ScanExec 
nodes be equal instead of identical?

Also, I don't think that this equals implementation is correct. First, it 
should not check for the streaming class. Second, it should check whether the 
scan is equal, not whether the options and the source are the same (plus, 
source will be removed).

Unfortunately, implementing true equality (not just identity) must in some 
way rely on a user-supplied class. A scan is the same if it will produce the 
same set of rows and columns in those rows. That means equality depends on the 
filter, projection, and source data (i.e. table). We can use `pushedFilters` 
and `output` for the filter and projection. But checking that the source data 
is the same requires using either the scan's `equals` method (which would also 
satisfy the filter and projection checks) or checking that the partitions are 
the same. Both `Scan` and `InputPartition` implementations are provided by 
sources, so their `equals` methods may not be implemented.

Because this must depend on checking equality of user-supplied objects, I 
think it would be much easier to make this depend only on equality of the 
`Scan`:

```
  override def equals(other: Any): Boolean = other match {
case scanExec: DataSourceV2ScanExec => scanExec.scan == this.scan
  }
```

That may fall back to identity if the user hasn't supplied an equals 
method, but I don't see a way to avoid it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236852153
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -170,15 +157,24 @@ object DataSourceV2Relation {
   }
 
   def create(
-  source: DataSourceV2,
+  provider: TableProvider,
+  table: SupportsBatchRead,
   options: Map[String, String],
   tableIdent: Option[TableIdentifier] = None,
   userSpecifiedSchema: Option[StructType] = None): 
DataSourceV2Relation = {
-val readSupport = source.createReadSupport(options, 
userSpecifiedSchema)
-val output = readSupport.fullSchema().toAttributes
+val output = table.schema().toAttributes
 val ident = tableIdent.orElse(tableFromOptions(options))
 DataSourceV2Relation(
-  source, readSupport, output, options, ident, userSpecifiedSchema)
+  provider, table, output, options, ident, userSpecifiedSchema)
+  }
+
+  def createRelationForWrite(
--- End diff --

Also note that this is temporary until the write side is finished?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236850263
  
--- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java 
---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.reader.Scan;
+import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a logical structured data set of a data 
source. For example, the
+ * implementation can be a directory on the file system, a topic of Kafka, 
or a table in the
+ * catalog, etc.
+ *
+ * This interface can mixin the following interfaces to support different 
operations:
+ * 
+ *   {@link SupportsBatchRead}: this table can be read in batch 
queries.
+ * 
+ */
+@Evolving
+public interface Table {
--- End diff --

It would be helpful for a `Table` to also expose a name or identifier of 
some kind. The `TableIdentifier` passed into `DataSourceV2Relation` is only 
used in `name` to identify the relation's table. If the name (or location for 
path-based tables) were supplied by the table instead, it would remove the need 
to pass it in the relation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236849290
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -40,8 +40,8 @@ import org.apache.spark.sql.types.StructType
  * @param userSpecifiedSchema The user-specified schema for this scan.
  */
 case class DataSourceV2Relation(
-source: DataSourceV2,
-readSupport: BatchReadSupport,
+source: TableProvider,
--- End diff --

May want to note that TableProvider will be removed when the write side is 
finished, since it is only used for `createWriteSupport`, which will be exposed 
through `Table`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236844174
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
 import 
org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
-import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, 
DataSourceOptions, DataSourceV2}
+import org.apache.spark.sql.sources.v2._
--- End diff --

Nit: using wildcard imports makes it harder to review without an IDE 
because it is more difficult to find out where symbols come from.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236823417
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.sources.v2.SupportsBatchRead;
+import org.apache.spark.sql.sources.v2.Table;
+
+/**
+ * A logical representation of a data source scan. This interface is used 
to provide logical
+ * information, like what the actual read schema is.
+ *
+ * This logical representation is shared between batch scan, micro-batch 
streaming scan and
+ * continuous streaming scan. Data sources must implement the 
corresponding methods in this
+ * interface, to match what the table promises to support. For example, 
{@link #toBatch()} must be
+ * implemented, if the {@link Table} that creates this {@link Scan} 
implements
+ * {@link SupportsBatchRead}.
+ */
+@Evolving
+public interface Scan {
+
+  /**
+   * Returns the actual schema of this data source scan, which may be 
different from the physical
+   * schema of the underlying storage, as column pruning or other 
optimizations may happen.
+   */
+  StructType readSchema();
+
+  /**
+   * Returns the physical representation of this scan for batch query. By 
default this method throws
+   * exception, data sources must overwrite this method to provide an 
implementation, if the
+   * {@link Table} that creates this scan implements {@link 
SupportsBatchRead}.
+   */
+  default Batch toBatch() {
+throw new UnsupportedOperationException("Do not support batch scan.");
--- End diff --

Nit: text should be "Batch scans are not supported". Starting with "Do not" 
makes the sentence a command.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236820896
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A physical representation of a data source scan for batch queries. This 
interface is used to
+ * provide physical information, like how many partitions the scanned data 
has, and how to read
+ * records from the partitions.
+ */
+@Evolving
+public interface Batch {
+
+  /**
+   * Returns a list of {@link InputPartition input partitions}. Each 
{@link InputPartition}
+   * represents a data split that can be processed by one Spark task. The 
number of input
+   * partitions returned here is the same as the number of RDD partitions 
this scan outputs.
+   *
+   * Note that, this may not be a full scan if the data source supports 
optimization like filter
+   * push-down. Implementations should check the status of {@link Scan} 
that creates this batch,
+   * and adjust the resulting {@link InputPartition input partitions}.
--- End diff --

I think this is a little unclear. Implementations do not necessarily check 
the scan. This Batch is likely configured with a filter and is responsible for 
creating splits for that filter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236820065
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
+ * have a public, 0-arg constructor.
+ *
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ */
+@Evolving
+// TODO: do not extend `DataSourceV2`, after we finish the API refactor 
completely.
+public interface TableProvider extends DataSourceV2 {
+
+  /**
+   * Return a {@link Table} instance to do read/write with user-specified 
options.
+   *
+   * @param options the user-specified options that can identify a table, 
e.g. file path, Kafka
+   *topic name, etc. It's an immutable case-insensitive 
string-to-string map.
+   */
+  Table getTable(DataSourceOptions options);
+
+  /**
+   * Return a {@link Table} instance to do read/write with user-specified 
schema and options.
+   *
--- End diff --

Minor: Javadoc doesn't automatically parse empty lines as new paragraphs. 
If you want to have one in documentation, then use ``.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236819758
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
+ * have a public, 0-arg constructor.
+ *
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ */
+@Evolving
+// TODO: do not extend `DataSourceV2`, after we finish the API refactor 
completely.
+public interface TableProvider extends DataSourceV2 {
+
+  /**
+   * Return a {@link Table} instance to do read/write with user-specified 
options.
+   *
+   * @param options the user-specified options that can identify a table, 
e.g. file path, Kafka
+   *topic name, etc. It's an immutable case-insensitive 
string-to-string map.
+   */
+  Table getTable(DataSourceOptions options);
+
+  /**
+   * Return a {@link Table} instance to do read/write with user-specified 
schema and options.
+   *
+   * By default this method throws {@link UnsupportedOperationException}, 
implementations should
--- End diff --

Javadoc would normally also add `@throws` with this information. I agree it 
should be here as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236818511
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
+ * have a public, 0-arg constructor.
+ *
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ */
+@InterfaceStability.Evolving
+// TODO: do not extend `DataSourceV2`, after we finish the API refactor 
completely.
+public interface TableProvider extends DataSourceV2 {
+
+  /**
+   * Return a {@link Table} instance to do read/write with user-specified 
options.
+   *
+   * @param options the user-specified options that can identify a table, 
e.g. file path, Kafka
+   *topic name, etc. It's an immutable case-insensitive 
string-to-string map.
+   */
+  Table getTable(DataSourceOptions options);
+
+  /**
+   * Return a {@link Table} instance to do read/write with user-specified 
schema and options.
+   *
+   * By default this method throws {@link UnsupportedOperationException}, 
implementations should
+   * override this method to handle user-specified schema.
+   *
+   * @param options the user-specified options that can identify a table, 
e.g. file path, Kafka
+   *topic name, etc. It's an immutable case-insensitive 
string-to-string map.
+   * @param schema the user-specified schema.
+   */
+  default Table getTable(DataSourceOptions options, StructType schema) {
--- End diff --

I agree with @cloud-fan. These are slightly different uses.

Here, it is supplying a schema for how to interpret data files. Say you 
have CSV files with columns `id`, `ts`, and `data` and no headers. This tells 
the CSV reader what the columns are and how to convert the data to useful types 
(bigint, timestamp, and string). Column projection will later request those 
columns, maybe just `id` and `data`. If you only passed the projection schema, 
then the `ts` values would be returned for the `data` column.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236816739
  
--- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java 
---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
--- End diff --

I can understand wanting to keep everything in Catalyst private. That's 
fine with me, but I think that Catalyst does need to be able to interact with 
tables and catalogs that are supplied by users.

For example: Our tables support schema evolution. Specifically, reading 
files that were written before a column was added. When we add a column, Spark 
shouldn't start failing in analysis for an AppendData operation in a scheduled 
job (as it would today). We need to be able to signal to the validation rule 
that the table supports reading files that are missing columns, so that Spark 
can do the right validation and allow writes that used to work to continue.

How would that information -- support for reading missing columns -- be 
communicated to the analyzer?

Also, what about my example above: how will the analyzer load tables using 
a user-supplied catalog if catalyst can't use any user-supplied implementations?

We could move all of the v2 analysis rules, like ResolveRelations, into the 
core module, but it seems to me that this requirement is no longer providing 
value if we have to do that. I think that catalyst is the right place for 
common plans and analysis rules to live because it is the library of common SQL 
components.

Wherever the rules and plans end up, they will need to access to the 
`TableCatalog` API.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236796331
  
--- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java 
---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.reader.Scan;
+import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a logical structured data set of a data 
source. For example, the
+ * implementation can be a directory on the file system, a topic of Kafka, 
or a table in the
+ * catalog, etc.
+ *
+ * This interface can mixin the following interfaces to support different 
operations:
+ * 
+ *   {@link SupportsBatchRead}: this table can be read in batch 
queries.
+ * 
+ */
+@Evolving
+public interface Table {
+
+  /**
+   * Returns the schema of this table.
+   */
+  StructType schema();
+
+  /**
+   * Returns a {@link ScanBuilder} which can be used to build a {@link 
Scan} later. Spark will call
+   * this method for each data scanning query.
+   *
+   * The builder can take some query specific information to do operators 
pushdown, and keep these
+   * information in the created {@link Scan}.
+   */
+  ScanBuilder newScanBuilder(DataSourceOptions options);
--- End diff --

Either in a follow-up or you can add the class in this PR. Either way works 
for me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-26 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236491385
  
--- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java 
---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.reader.Scan;
+import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a logical structured data set of a data 
source. For example, the
+ * implementation can be a directory on the file system, a topic of Kafka, 
or a table in the
+ * catalog, etc.
+ *
+ * This interface can mixin the following interfaces to support different 
operations:
+ * 
+ *   {@link SupportsBatchRead}: this table can be read in batch 
queries.
+ * 
+ */
+@Evolving
+public interface Table {
+
+  /**
+   * Returns the schema of this table.
+   */
+  StructType schema();
+
+  /**
+   * Returns a {@link ScanBuilder} which can be used to build a {@link 
Scan} later. Spark will call
+   * this method for each data scanning query.
+   *
+   * The builder can take some query specific information to do operators 
pushdown, and keep these
+   * information in the created {@link Scan}.
+   */
+  ScanBuilder newScanBuilder(DataSourceOptions options);
--- End diff --

`DataSourceOptions` isn't simply a map for two main reasons that I can 
tell: first, it forces options to be case insensitive, and second, it exposes 
helper methods to identify tables, like `tableName`, `databaseName`, and 
`paths`. In the new abstraction, the second use of `DataSourceOptions` is no 
longer needed. The table is already instantiated by the time that this is 
called.

We should to reconsider `DataSourceOptions`. The `tableName` methods aren't 
needed and we also no longer need to forward properties from the session config 
because the way tables are configured has changed (catalogs handle that). I 
think we should remove this class and instead use the more direct 
implementation, `CaseInsensitiveStringMap` from #21306. The behavior of that 
class is obvious from its name and it would be shared between the v2 APIs, both 
catalog and data source.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...

2018-11-26 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23086#discussion_r236487464
  
--- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java 
---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
--- End diff --

#21306 (TableCatalog support) adds this class as 
`org.apache.spark.sql.catalog.v2.Table` in the `spark-catalyst` module. I think 
it needs to be in the catalyst module and should probably be in the 
`o.a.s.sql.catalog.v2` package as well.

The important one is moving this to the catalyst module. The analyzer is in 
catalyst and all of the v2 logical plans and analysis rules will be in catalyst 
as well, because we are standardizing behavior. The standard validation rules 
should be in catalyst, not in a source-specific or hive-specific package in the 
sql-core or hive modules.

Because the logical plans and validation rules are in the catalyst package, 
the `TableCatalog` API needs to be there as well. For example, when a [catalog 
table identifier](https://github.com/apache/spark/pull/21978) is resolved for a 
read query, one of the results is a `TableCatalog` instance for the catalog 
portion of the identifier. That catalog is used to load the v2 table, which is 
then wrapped in a v2 relation for further analysis. Similarly, the write path 
should also validate that the catalog exists during analysis by loading it, and 
would then pass the catalog in a v2 logical plan for `CreateTable` or 
`CreateTableAsSelect`.

I also think that it makes sense to use the 
`org.apache.spark.sql.catalog.v2` package for `Table` because `Table` is more 
closely tied to the `TableCatalog` API than to the data source API. The link to 
DSv2 is that `Table` carries `newScanBuilder`, but the rest of the methods 
exposed by `Table` are for catalog functions, like inspecting a table's 
partitioning or table properties.

Moving this class would make adding `TableCatalog` less intrusive.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...

2018-11-26 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23055#discussion_r236480711
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", 
true)
   // each python worker gets an equal part of the allocation. the worker 
pool will grow to the
   // number of concurrent tasks, which is determined by the number of 
cores in this executor.
-  private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
+  private val memoryMb = if (Utils.isWindows) {
--- End diff --

Documentation should be "For platforms where the `resource` API is 
available, python will limit its resource usage". The allocation on the JVM 
side is still the correct behavior.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...

2018-11-26 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23055#discussion_r236345625
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", 
true)
   // each python worker gets an equal part of the allocation. the worker 
pool will grow to the
   // number of concurrent tasks, which is determined by the number of 
cores in this executor.
-  private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
+  private val memoryMb = if (Utils.isWindows) {
--- End diff --

> functionality is disabled in Python side

The only functionality that is disabled is limiting the memory space. The 
allocation for Python is still requested from resource managers. Setting the 
environment property tells python how much memory it was allocated, no matter 
how that is used or enforced.

> code consistency - usually the configuration is dealt with in JVM side if 
possible

The JVM is handling the setting by requesting that memory for python and 
passing on the amount requested to python. The fact that the python process 
can't limit doesn't affect how the JVM side should behave. This needlessly 
couples JVM and python behavior with an assumption that may not be true in the 
future.

> Why are you so against about disabling it in JVM side?

There is no benefit to disabling this. It is more code with no purpose and 
it makes assumptions about what python can or cannot do that aren't obvious.

What if pandas implements some method to spill to disk to limit memory 
consumption? Will implementers of that future feature know that the environment 
variable is not set when running in windows? This adds complexity for no 
benefit because it doesn't change either the resource allocation in the JVM or 
the behavior of the python process. It only avoids sending valuable 
information. I see no reason for it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...

2018-11-20 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23055#discussion_r235082191
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", 
true)
   // each python worker gets an equal part of the allocation. the worker 
pool will grow to the
   // number of concurrent tasks, which is determined by the number of 
cores in this executor.
-  private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
+  private val memoryMb = if (Utils.isWindows) {
--- End diff --

There is no configuration to change needed on the JVM side.

The JVM should communicate to Python how much memory it is allocated. If 
Python can limit itself to that amount, then that's fine. If the JVM doesn't 
expect Python to be able to limit, why would it not tell Python how much memory 
it was allocated?

There is no benefit to making this change that I can see.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...

2018-11-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23055#discussion_r234691652
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", 
true)
   // each python worker gets an equal part of the allocation. the worker 
pool will grow to the
   // number of concurrent tasks, which is determined by the number of 
cores in this executor.
-  private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
+  private val memoryMb = if (Utils.isWindows) {
--- End diff --

@HyukjinKwon, what should the JVM side do differently if `resource` is not 
available?

I don't think it should do anything different. It should still allocate the 
python memory region when requesting resources from schedulers. The only 
difference is that python isn't self-limiting. Do you have an example of 
something that the JVM should change when running on Windows?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

2018-11-16 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/22547
  
I agree that there is consensus for the proposal in the design doc and I 
don't think there are any blockers. If there's something I can do to help, 
please let me know. Otherwise ping me to review!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...

2018-11-16 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23055#discussion_r234286173
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", 
true)
   // each python worker gets an equal part of the allocation. the worker 
pool will grow to the
   // number of concurrent tasks, which is determined by the number of 
cores in this executor.
-  private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
+  private val memoryMb = if (Utils.isWindows) {
--- End diff --

My point is that if resource can't be loaded for any reason, the code 
shouldn't fail. As it is, if resource can't be loaded then that is handled, but 
if the memory limit is set then the worker will still try to use it. That's 
what I think is brittle. There should be a flag for whether to attempt to use 
the resource API, based on whether it was loaded.

If the worker operates as I described, then why make any changes on the JVM 
side? Why avoid telling the worker how much memory it has?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...

2018-11-15 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23055#discussion_r234084002
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", 
true)
   // each python worker gets an equal part of the allocation. the worker 
pool will grow to the
   // number of concurrent tasks, which is determined by the number of 
cores in this executor.
-  private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
+  private val memoryMb = if (Utils.isWindows) {
--- End diff --

I mean that it is brittle to try to use `resource` if the JVM has set the 
property. You handle the `ImportError`, but the JVM could set the request and 
Python would break again.

I think that this should not be entirely disabled on Windows. Resource 
requests to YARN or other schedulers should include this memory. The only 
feature that should be disabled is the resource limiting on the python side.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pyspark.me...

2018-11-15 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/23055
  
Thanks for fixing this so quickly, @HyukjinKwon! I'd like a couple of 
changes, but overall it is going in the right direction.

We should also plan on porting this to the 2.4 branch when it is committed 
since it is a regression.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...

2018-11-15 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23055#discussion_r234080578
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", 
true)
   // each python worker gets an equal part of the allocation. the worker 
pool will grow to the
   // number of concurrent tasks, which is determined by the number of 
cores in this executor.
-  private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
+  private val memoryMb = if (Utils.isWindows) {
--- End diff --

I don't think this is necessary. If `resource` can't be imported for any 
reason, then memory will not be limited in python. But the JVM side shouldn't 
be what determines whether that happens. The JVM should do everything the same 
way -- even requesting memory from schedulers like YARN because that space 
should still be allocated as python memory, even if python can't self-limit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...

2018-11-15 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/23055#discussion_r234080290
  
--- Diff: python/pyspark/worker.py ---
@@ -268,9 +272,11 @@ def main(infile, outfile):
 
 # set up memory limits
 memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', 
"-1"))
-total_memory = resource.RLIMIT_AS
-try:
-if memory_limit_mb > 0:
+# 'PYSPARK_EXECUTOR_MEMORY_MB' should be undefined on Windows 
because it depends on
+# resource package which is a Unix specific package.
+if memory_limit_mb > 0:
--- End diff --

It seems brittle to disable this on the JVM side and rely on it here. Can 
we also set a flag in the ImportError case and also check that here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-11-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r231707076
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * TableChange subclasses represent requested changes to a table. These 
are passed to
+ * {@link TableCatalog#alterTable}. For example,
+ * 
+ *   import TableChange._
+ *   val catalog = source.asInstanceOf[TableSupport].catalog()
+ *   catalog.alterTable(ident,
+ *   addColumn("x", IntegerType),
+ *   renameColumn("a", "b"),
+ *   deleteColumn("c")
+ * )
+ * 
+ */
+public interface TableChange {
--- End diff --

@mccheah, our table format supports updating the partitioning of a table, 
so I think it should be supported. But, this is intended to be an initial API 
so I didn't want to block this on agreeing how to repartition a table.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-11-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r231706583
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents table metadata from a {@link TableCatalog} or other table 
sources.
+ */
+public interface Table {
--- End diff --

I think the two `Table` classes are trying to be the same thing. This is 
one of the reasons why I brought it up in the sync. @cloud-fan's current PR 
isn't yet based on this work, so it doesn't get the abstraction right.

What you linked to uses `Table` to expose `newScanConfigBuilder`, basically 
requiring that all tables are readable. Instead, the implementation classes in 
#22547 should be interfaces that extend this `Table` to make it readable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-11-02 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r230528510
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala
 ---
@@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType
  *   scenarios, where some offsets after the specified 
initial ones can't be
  *   properly read.
  */
-class KafkaContinuousReadSupport(
+class KafkaContinuousInputStream(
--- End diff --

I'd prefer that the commits themselves compile, but since this is 
separating the modes I think it could be done incrementally.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...

2018-10-22 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21306
  
@felixcheung, we're waiting on more reviews and a community decision about 
how to pass partition transforms.

For passing transforms, I think the most reasonable compromise is to go 
with a generic function application, so each transform would be passed as a 
function/transform name with one or more arguments, where each argument is 
either a column reference (by name) or a literal value. That's a fairly small 
public API addition but it supports a lot of different partitioning schemes to 
be expressed, including the one above for Kudu.

We already have all of this implemented based on the current PR, but I can 
update this in the next week or so.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

2018-10-19 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/22547
  
@jose-torres, I don't mean that the primary purpose of the v2 API is for 
catalog integration, I mean that the primary use of v2 is with tables that are 
stored in some catalog. So we should make sure that the plan and design work 
well with catalog tables.

Another reason that catalog tables are important is that the v2 plans 
require a catalog for consistent behavior. So catalogs are important and I 
think will affect the implementation details.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226798538
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/Format.java ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The base interface for data source v2. Implementations must have a 
public, 0-arg constructor.
+ *
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ */
+@InterfaceStability.Evolving
+public interface Format extends DataSourceV2 {
+
+  /**
+   * Return a {@link Table} instance to do read/write with user-specified 
options.
+   *
+   * @param options the user-specified options that can identify a table, 
e.g. path, table name,
--- End diff --

Why is it necessary to pass table name and database to Format? Format 
should only be used in 2 places to create tables. First, in the DataFrameReader 
(or writer) API when a format is specified directly instead of a 
catalog/database/table or catalog/path. Second, it would be used in catalogs 
that support pluggable implementations, like the current session catalog, which 
needs to dynamically instantiate implementations based on the table's provider.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226798213
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/Format.java ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The base interface for data source v2. Implementations must have a 
public, 0-arg constructor.
+ *
+ * The major responsibility of this interface is to return a {@link Table} 
for read/write.
+ */
+@InterfaceStability.Evolving
+public interface Format extends DataSourceV2 {
--- End diff --

Why is there both Format and DataSourceV2? What does DataSourceV2 do?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

2018-10-19 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/22547
  
After looking at the changes, I want to reiterate that request for a design 
doc. I think that code is a great way to prototype a design, but that we need 
to step back and make sure that the design makes sense when you view it from a 
high level.

I have two main motivations for that point. First, there are some classes 
that I don't see a justification for, like having a separate ScanConfig, 
BatchScan, and PartitionReaderFactory. Are all of those separate classes 
necessary? Can a ScanConfigBuilder return a BatchScan? Can BatchScan expose a 
createBatchReader(InputPartition) method?

My second motivation for saying we need a clear design doc is that I think 
that the current way to interact with v2 doesn't fit well with catalogs. This 
is based around Format, which is based on the v1 method of loading read and 
write implementations. But that isn't the primary way that v2 will be used be 
used. It happens to be the only way to call into the v2 API from Spark today, 
but the primary use of v2 is to integrate sources that are actually modeled as 
tables in some catalog.

For example, Format exposes getTable that returns a Table implementation 
from DataSourceOptions. Those options have tableName and databaseName methods. 
But tables that are identified by name shouldn't be loaded by a Format, they 
should be loaded by a catalog. It also uses the options for both table options 
and read options because there isn't a way to pass both. But most tables will 
be created with table options by a catalog and will accept read-specific 
options passed to the DataFrameReader.

I think we would approach a usable API much sooner if this work was planned 
based on a shared understanding of how catalogs and tables will interact in the 
future. Not having a catalog API right now is affecting the way tables work in 
this PR, and that's a concern for me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226796934
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -173,12 +185,17 @@ object DataSourceV2Relation {
   source: DataSourceV2,
   options: Map[String, String],
   tableIdent: Option[TableIdentifier] = None,
-  userSpecifiedSchema: Option[StructType] = None): 
DataSourceV2Relation = {
-val readSupport = source.createReadSupport(options, 
userSpecifiedSchema)
-val output = readSupport.fullSchema().toAttributes
+  userSpecifiedSchema: Option[StructType] = None): 
Option[DataSourceV2Relation] = {
--- End diff --

This shouldn't return an option. A relation is not a read-side structure, 
it is also used in write-side logical plans as the target of a write. 
Validation rules like PreprocessTableInsertion validate the write dataframe 
against the relation's schema. That's why the relation has a newWriteSupport 
method.

Creating a relation from a Table should always work, even if the table 
isn't readable or isn't writable. Analysis can be done later to validate 
whether the plan that contains a relation can actually use the table.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226790252
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/InputStream.java
 ---
@@ -17,14 +17,18 @@
 
 package org.apache.spark.sql.sources.v2.reader.streaming;
 
-import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
 
 /**
- * A base interface for streaming read support. This is package private 
and is invisible to data
- * sources. Data sources should implement concrete streaming read support 
interfaces:
- * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ * An interface representing a readable data stream in a streaming query. 
It's responsible to manage
+ * the offsets of the streaming source in this streaming query.
+ *
+ * Data sources should implement concrete input stream interfaces: {@link 
MicroBatchInputStream} and
+ * {@link ContinuousInputStream}.
  */
-interface StreamingReadSupport extends ReadSupport {
+@InterfaceStability.Evolving
+public interface InputStream extends BaseStreamingSource {
--- End diff --

`InputStream` conflicts with a well-known JVM class, 
[`java.io.InputStream`](https://docs.oracle.com/javase/9/docs/api/java/io/InputStream.html).
 I think this should be renamed to be more specific to a streaming table scan.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226789748
  
--- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java 
---
@@ -15,37 +15,43 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.sources.v2.reader;
+package org.apache.spark.sql.sources.v2;
 
 import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.datasources.v2.NoopScanConfigBuilder;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+import org.apache.spark.sql.types.StructType;
 
 /**
- * An interface that defines how to load the data from data source for 
batch processing.
+ * An interface representing a logical structured data set of a data 
source. For example, the
+ * implementation can be a directory on the file system, or a table in the 
catalog, etc.
  *
- * The execution engine will get an instance of this interface from a data 
source provider
- * (e.g. {@link org.apache.spark.sql.sources.v2.BatchReadSupportProvider}) 
at the start of a batch
- * query, then call {@link #newScanConfigBuilder()} and create an instance 
of {@link ScanConfig}.
- * The {@link ScanConfigBuilder} can apply operator pushdown and keep the 
pushdown result in
- * {@link ScanConfig}. The {@link ScanConfig} will be used to create input 
partitions and reader
- * factory to scan data from the data source with a Spark job.
+ * This interface can mixin the following interfaces to support different 
operations:
+ * 
+ *   {@link SupportsBatchRead}: this table can be read in batch 
queries.
+ *   {@link SupportsMicroBatchRead}: this table can be read in 
streaming queries with
+ *   micro-batch trigger.
+ *   {@link SupportsContinuousRead}: this table can be read in 
streaming queries with
+ *   continuous trigger.
+ * 
  */
 @InterfaceStability.Evolving
-public interface BatchReadSupport extends ReadSupport {
+public interface Table {
+
+  /**
+   * Returns the schema of this table.
+   */
+  StructType schema();
 
   /**
* Returns a builder of {@link ScanConfig}. Spark will call this method 
and create a
* {@link ScanConfig} for each data scanning job.
*
* The builder can take some query specific information to do operators 
pushdown, and keep these
* information in the created {@link ScanConfig}.
-   *
-   * This is the first step of the data scan. All other methods in {@link 
BatchReadSupport} needs
-   * to take {@link ScanConfig} as an input.
-   */
-  ScanConfigBuilder newScanConfigBuilder();
-
-  /**
-   * Returns a factory, which produces one {@link PartitionReader} for one 
{@link InputPartition}.
*/
-  PartitionReaderFactory createReaderFactory(ScanConfig config);
+  default ScanConfigBuilder newScanConfigBuilder(DataSourceOptions 
options) {
--- End diff --

I think it should be clear that these are scan-specific options. Maybe add 
some documentation with an example of something that would be passed to 
configure a scan, like a target split size for combining.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226789610
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java 
---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.BatchScan;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * A mix-in interface for {@link Table}. Table implementations can mixin 
this interface to
+ * provide data reading ability for batch processing.
+ */
+@InterfaceStability.Evolving
+public interface SupportsBatchRead extends Table {
+
+  /**
+   * Creates a {@link BatchScan} instance with a {@link ScanConfig} and 
user-specified options.
+   *
+   * @param config a {@link ScanConfig} which may contains operator 
pushdown information.
+   * @param options the user-specified options, which is same as the one 
used to create the
+   *{@link ScanConfigBuilder} that built the given {@link 
ScanConfig}.
+   */
+  BatchScan createBatchScan(ScanConfig config, DataSourceOptions options);
--- End diff --

Is there a benefit to having both `ScanConfig` and `BatchScan` objects? Why 
not have `ScanConfigBuilder` return a `BatchScan` directly by calling 
`buildBatch`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226785695
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchScan.java ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.SupportsBatchRead;
+import org.apache.spark.sql.sources.v2.Table;
+
+/**
+ * A {@link Scan} for batch queries.
+ *
+ * The execution engine will get an instance of {@link Table} first, then 
call
+ * {@link Table#newScanConfigBuilder(DataSourceOptions)} and create an 
instance of
+ * {@link ScanConfig}. The {@link ScanConfigBuilder} can apply operator 
pushdown and keep the
+ * pushdown result in {@link ScanConfig}. Then
+ * {@link SupportsBatchRead#createBatchScan(ScanConfig, 
DataSourceOptions)} will be called to create
+ * a {@link BatchScan} instance, which will be used to create input 
partitions and reader factory to
+ * scan data from the data source with a Spark job.
+ */
+@InterfaceStability.Evolving
+public interface BatchScan extends Scan {
+
+  /**
+   * Returns a factory, which produces one {@link PartitionReader} for one 
{@link InputPartition}.
+   */
+  PartitionReaderFactory createReaderFactory();
--- End diff --

Why are `BatchScan` and `PartitionReaderFactory` different interfaces?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226784919
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java 
---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.BatchScan;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * A mix-in interface for {@link Table}. Table implementations can mixin 
this interface to
+ * provide data reading ability for batch processing.
+ */
+@InterfaceStability.Evolving
+public interface SupportsBatchRead extends Table {
+
+  /**
+   * Creates a {@link BatchScan} instance with a {@link ScanConfig} and 
user-specified options.
+   *
+   * @param config a {@link ScanConfig} which may contains operator 
pushdown information.
+   * @param options the user-specified options, which is same as the one 
used to create the
+   *{@link ScanConfigBuilder} that built the given {@link 
ScanConfig}.
--- End diff --

I don't think that options should be passed twice.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226783272
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -106,85 +107,96 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
   failOnDataLoss(caseInsensitiveParams))
   }
 
-  /**
-   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to 
read
-   * batches of Kafka data in a micro-batch streaming query.
-   */
-  override def createMicroBatchReadSupport(
-  metadataPath: String,
-  options: DataSourceOptions): KafkaMicroBatchReadSupport = {
-
-val parameters = options.asMap().asScala.toMap
-validateStreamOptions(parameters)
-// Each running query should use its own group id. Otherwise, the 
query may be only assigned
-// partial data since Kafka will assign partitions to multiple 
consumers having the same group
-// id. Hence, we should generate a unique id for each query.
-val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
-val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
-val specifiedKafkaParams =
-  parameters
-.keySet
-.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
-.map { k => k.drop(6).toString -> parameters(k) }
-.toMap
-
-val startingStreamOffsets = 
KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
-  STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
-
-val kafkaOffsetReader = new KafkaOffsetReader(
-  strategy(caseInsensitiveParams),
-  kafkaParamsForDriver(specifiedKafkaParams),
-  parameters,
-  driverGroupIdPrefix = s"$uniqueGroupId-driver")
-
-new KafkaMicroBatchReadSupport(
-  kafkaOffsetReader,
-  kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
-  options,
-  metadataPath,
-  startingStreamOffsets,
-  failOnDataLoss(caseInsensitiveParams))
+  override def getTable(options: DataSourceOptions): KafkaTable.type = {
+KafkaTable
   }
 
-  /**
-   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to 
read
-   * Kafka data in a continuous streaming query.
-   */
-  override def createContinuousReadSupport(
-  metadataPath: String,
-  options: DataSourceOptions): KafkaContinuousReadSupport = {
-val parameters = options.asMap().asScala.toMap
-validateStreamOptions(parameters)
-// Each running query should use its own group id. Otherwise, the 
query may be only assigned
-// partial data since Kafka will assign partitions to multiple 
consumers having the same group
-// id. Hence, we should generate a unique id for each query.
-val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
-val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
-val specifiedKafkaParams =
-  parameters
-.keySet
-.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
-.map { k => k.drop(6).toString -> parameters(k) }
-.toMap
+  object KafkaTable extends Table
--- End diff --

Why is `KafkaTable` an object, not a class? This doesn't seem to fit an 
abstraction.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

2018-10-19 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/22547
  
@cloud-fan, is there a design doc that outlines these changes and the new 
API structure?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22547#discussion_r226782371
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala
 ---
@@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType
  *   scenarios, where some offsets after the specified 
initial ones can't be
  *   properly read.
  */
-class KafkaContinuousReadSupport(
+class KafkaContinuousInputStream(
--- End diff --

Is it possible to break this change into multiple PRs for batch, 
microbatch, and continuous? It's really large and it would be nice if we could 
get the changes in incrementally.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r226780862
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -169,15 +174,16 @@ object DataSourceV2Relation {
   options: Map[String, String],
   tableIdent: Option[TableIdentifier] = None,
   userSpecifiedSchema: Option[StructType] = None): 
DataSourceV2Relation = {
-val reader = source.createReader(options, userSpecifiedSchema)
+val readSupport = source.createReadSupport(options, 
userSpecifiedSchema)
--- End diff --

In the long term, I don't think that sources should use the reader to get a 
schema. This is a temporary hack until we have catalog support, which is really 
where schemas should come from.

The way this works in our version (which is substantially ahead of upstream 
Spark, unfortunately), is that a Table is loaded by a Catalog. The schema 
reported by that table is used to validate writes. That way, the table can 
report it's schema and Spark knows that data written must be compatible with 
that schema, but the source isn't required to be readable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22501: [SPARK-25492][TEST] Refactor WideSchemaBenchmark ...

2018-10-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22501#discussion_r226765772
  
--- Diff: sql/core/benchmarks/WideSchemaBenchmark-results.txt ---
@@ -1,117 +1,145 @@
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6
-Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz

+
+parsing large select expressions

+
 
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 parsing large select:Best/Avg Time(ms)Rate(M/s)   
Per Row(ns)   Relative
 

-1 select expressions 2 /4  0.0 
2050147.0   1.0X
-100 select expressions   6 /7  0.0 
6123412.0   0.3X
-2500 select expressions135 /  141  0.0   
134623148.0   0.0X
+1 select expressions 2 /4  0.0 
1934953.0   1.0X
+100 select expressions   4 /5  0.0 
3659399.0   0.5X
+2500 select expressions 68 /   76  0.0
68278937.0   0.0X
 
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6
-Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz
 

+
+many column field read and write

+
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
+Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
 many column field r/w:   Best/Avg Time(ms)Rate(M/s)   
Per Row(ns)   Relative
 

-1 cols x 10 rows (read in-mem)  16 /   18  6.3 
158.6   1.0X
-1 cols x 10 rows (exec in-mem)  17 /   19  6.0 
166.7   1.0X
-1 cols x 10 rows (read parquet) 24 /   26  4.3 
235.1   0.7X
-1 cols x 10 rows (write parquet)81 /   85  1.2 
811.3   0.2X
-100 cols x 1000 rows (read in-mem)  17 /   19  6.0 
166.2   1.0X
-100 cols x 1000 rows (exec in-mem)  25 /   27  4.0 
249.2   0.6X
-100 cols x 1000 rows (read parquet) 23 /   25  4.4 
226.0   0.7X
-100 cols x 1000 rows (write parquet)83 /   87  1.2 
831.0   0.2X
-2500 cols x 40 rows (read in-mem)  132 /  137  0.8 
   1322.9   0.1X
-2500 cols x 40 rows (exec in-mem)  326 /  330  0.3 
   3260.6   0.0X
-2500 cols x 40 rows (read parquet) 831 /  839  0.1 
   8305.8   0.0X
-2500 cols x 40 rows (write parquet)237 /  245  0.4 
   2372.6   0.1X
-
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6
-Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz
+1 cols x 10 rows (read in-mem)  22 /   25  4.6 
219.4   1.0X
+1 cols x 10 rows (exec in-mem)  22 /   28  4.5 
223.8   1.0X
+1 cols x 10 rows (read parquet) 45 /   49  2.2 
449.6   0.5X
+1 cols x 10 rows (write parquet)   204 /  223  0.5 
   2044.4   0.1X
--- End diff --

@dongjoon-hyun, so you are saying that it doesn't appear that there is a 
performance regression, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

2018-10-17 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/22547
  
@cloud-fan, sorry to look at this so late, I was out on vacation for a 
little while. Is this about ready for review?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22573: [SPARK-25558][SQL] Pushdown predicates for nested fields...

2018-10-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/22573
  
@dongjoon-hyun, Iceberg schema evolution is based on the field IDs, not on 
names. The current table schema's names are the runtime names for columns in 
that table, and all reads happen by first translating those names to IDs and 
projecting the IDs from the data files. That way, renames can never cause you 
to get incorrect data.

You're mostly right that Spark has a problem with schema evolution for 
HadoopFS tables. That wouldn't affect my suggestion here, though. If you're 
filtering or projecting field `m.n`, then Spark currently handles that by 
matching columns by name. If you're matching by name, then `m.n` can't change 
across versions, or at least you can always project `m.n` from the data (in the 
case of Avro).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22573: [SPARK-25558][SQL] Pushdown predicates for nested fields...

2018-10-01 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/22573
  
The approach we've taken in Iceberg is to allow `.` in names by using an 
index in the top-level schema. The full path of every leaf in the schema is 
produced and added to a map from the full field name to the field's ID.

The reason why we do this is to avoid problem areas:

* Parsing the name using `.` as a delimiter
* Traversing the schema structure

For example, the schema `0: a struct<2: x int, 3: y int>, 1: a.z int` 
produces this index: `Map("a" -> 0, "a.x" -> 2, "a.y" -> 3, "a.z" -> 1)`.

Binding filters like `a.x > 3` or `a.z < 5` is done using the index instead 
of parsing the field name and traversing, so you get the right result without 
needing to decide whether "a.x" is nested or if it is the actual name. So the 
lookup is quick and correctly produces `id(2) > 3` and `id(1) < 5`. This is 
also used for projection because users want to be able to select nested columns 
by name using dotted field names.

The only drawback to this approach is that you can't have duplicates in the 
index: each full field name must be unique. In the example above, the top-level 
`a.z` field could not be named `a.x` or else it would collide with `x` nested 
in `a`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22413: [SPARK-25425][SQL] Extra options should override session...

2018-09-19 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/22413
  
Thanks @MaxGekk, sorry for the original omission!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >