[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/23208 @cloud-fan, what are you suggesting to use as a design? If you think this shouldn't mirror the read side, then let's be clear on what it should look like. Maybe that's a design doc, or maybe that's a discussion thread on the mailing list. Whatever option we go for, we still need to have a plan for exposing the replace-by-filter and replace-dynamic-partitions methods, whatever they end up being. We also need the life-cycle to match. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239889152 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,52 +17,49 @@ package org.apache.spark.sql.execution.datasources.v2 -import java.util.UUID - -import scala.collection.JavaConverters._ +import java.util.{Optional, UUID} import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport import org.apache.spark.sql.types.StructType /** - * A logical plan representing a data source v2 scan. + * A logical plan representing a data source v2 table. * - * @param source An instance of a [[DataSourceV2]] implementation. - * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]]. - * @param userSpecifiedSchema The user-specified schema for this scan. + * @param table The table that this relation represents. + * @param options The options for this table operation. It's used to create fresh [[ScanBuilder]] + *and [[BatchWriteSupport]]. */ case class DataSourceV2Relation( -// TODO: remove `source` when we finish API refactor for write. -source: TableProvider, -table: SupportsBatchRead, +table: Table, output: Seq[AttributeReference], -options: Map[String, String], -userSpecifiedSchema: Option[StructType] = None) +// TODO: use a simple case insensitive map instead. +options: DataSourceOptions) --- End diff -- A private method to do that existed in the past. Why not just revive it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239888975 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java --- @@ -25,14 +25,14 @@ import org.apache.spark.sql.types.StructType; /** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * A mix-in interface for {@link Table}. Data sources can implement this interface to * provide data writing ability for batch processing. * * This interface is used to create {@link BatchWriteSupport} instances when end users run * {@code Dataset.write.format(...).option(...).save()}. */ @Evolving -public interface BatchWriteSupportProvider extends DataSourceV2 { +public interface SupportsBatchWrite extends Table { --- End diff -- I'm fine either way, as long as we are consistent between the read and write sides. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239888795 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java --- @@ -25,7 +25,10 @@ * The base interface for v2 data sources which don't have a real catalog. Implementations must * have a public, 0-arg constructor. * - * The major responsibility of this interface is to return a {@link Table} for read/write. + * The major responsibility of this interface is to return a {@link Table} for read/write. If you + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter` + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The + * table schema can be empty in this case. --- End diff -- I think we can remove SaveMode right away. We don't need to break existing use cases if we add the OverwriteData plan and use it when the user's mode is Overwrite. That helps us get to the point where we can integrate SQL on top of this faster. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239613722 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java --- @@ -25,14 +25,14 @@ import org.apache.spark.sql.types.StructType; /** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * A mix-in interface for {@link Table}. Data sources can implement this interface to * provide data writing ability for batch processing. * * This interface is used to create {@link BatchWriteSupport} instances when end users run * {@code Dataset.write.format(...).option(...).save()}. */ @Evolving -public interface BatchWriteSupportProvider extends DataSourceV2 { +public interface SupportsBatchWrite extends Table { --- End diff -- `Table` exposes `newScanBuilder` without an interface. Why should the write side be different? Doesn't Spark support sources that are read-only and write-only? I think that both reads and writes should use interfaces to mix support into `Table` or both should be exposed by `Table` and throw `UnsupportedOperationException` by default, not a mix of the two options. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239613088 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,52 +17,49 @@ package org.apache.spark.sql.execution.datasources.v2 -import java.util.UUID - -import scala.collection.JavaConverters._ +import java.util.{Optional, UUID} import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport import org.apache.spark.sql.types.StructType /** - * A logical plan representing a data source v2 scan. + * A logical plan representing a data source v2 table. * - * @param source An instance of a [[DataSourceV2]] implementation. - * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]]. - * @param userSpecifiedSchema The user-specified schema for this scan. + * @param table The table that this relation represents. + * @param options The options for this table operation. It's used to create fresh [[ScanBuilder]] + *and [[BatchWriteSupport]]. */ case class DataSourceV2Relation( -// TODO: remove `source` when we finish API refactor for write. -source: TableProvider, -table: SupportsBatchRead, +table: Table, output: Seq[AttributeReference], -options: Map[String, String], -userSpecifiedSchema: Option[StructType] = None) +// TODO: use a simple case insensitive map instead. +options: DataSourceOptions) --- End diff -- Why change this now, when DataSourceOptions will be replaced? I would say just leave it as a map and update it once later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/23208 @cloud-fan, I see that this adds `Table` and uses `TableProvider`, but I was expecting this to also update the write side to mirror the read side, like PR #22190 for [SPARK-25188](https://issues.apache.org/jira/browse/SPARK-25188) (originally proposed in [discussion on SPARK-24882](https://issues.apache.org/jira/browse/SPARK-24882?focusedCommentId=16581725=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16581725)). The main parts that we discussed there were: * Mirror the read side structure by adding WriteConfig. Now, that would be adding a WriteBuilder. * Mirroring the read life-cycle of ScanBuilder and Scan, to enable use cases like acquiring and holding a write lock, for example. * Using the WriteBuilder to expose more write configuration to support overwrite and dynamic partition overwrite. We don't need to add the overwrite mix-ins here, but I would expect to see a WriteBuilder that returns a Writer. (`Table -> WriteBuilder -> Write` matches `Table -> ScanBulder -> Scan`.) The Write would expose BatchWrite and StreamWrite (if they are different) or could directly expose the WriteFactory, commit, abort, etc. WriteBuilder would be extensible so that SupportsOverwrite and SupportsDynamicOverwrite can be added as mix-ins at some point. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239598346 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotBucketed("save") -val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) -if (classOf[DataSourceV2].isAssignableFrom(cls)) { - val source = cls.getConstructor().newInstance().asInstanceOf[DataSourceV2] - source match { -case provider: BatchWriteSupportProvider => - val sessionOptions = DataSourceV2Utils.extractSessionConfigs( -source, -df.sparkSession.sessionState.conf) - val options = sessionOptions ++ extraOptions - +val session = df.sparkSession +val cls = DataSource.lookupDataSource(source, session.sessionState.conf) +if (classOf[TableProvider].isAssignableFrom(cls)) { + val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( +provider, session.sessionState.conf) + val options = sessionOptions ++ extraOptions + val dsOptions = new DataSourceOptions(options.asJava) + provider.getTable(dsOptions) match { +case table: SupportsBatchWrite => + val relation = DataSourceV2Relation.create(table, dsOptions) + // TODO: revisit it. We should not create the `AppendData` operator for `SaveMode.Append`. + // We should create new end-users APIs for the `AppendData` operator. --- End diff -- Here is what my branch uses for this logic: ```scala val maybeTable = provider.getTable(identifier) val exists = maybeTable.isDefined (exists, mode) match { case (true, SaveMode.ErrorIfExists) => throw new AnalysisException(s"Table already exists: ${identifier.quotedString}") case (true, SaveMode.Overwrite) => val relation = DataSourceV2Relation.create( catalog.name, identifier, maybeTable.get, options) runCommand(df.sparkSession, "insertInto") { OverwritePartitionsDynamic.byName(relation, df.logicalPlan) } case (true, SaveMode.Append) => val relation = DataSourceV2Relation.create( catalog.name, identifier, maybeTable.get, options) runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) } case (false, SaveMode.Append) | (false, SaveMode.ErrorIfExists) | (false, SaveMode.Ignore) | (false, SaveMode.Overwrite) => runCommand(df.sparkSession, "save") { CreateTableAsSelect(catalog, identifier, Seq.empty, df.logicalPlan, options, ignoreIfExists = mode == SaveMode.Ignore) } case _ => // table exists and mode is ignore } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239596456 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotBucketed("save") -val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) -if (classOf[DataSourceV2].isAssignableFrom(cls)) { - val source = cls.getConstructor().newInstance().asInstanceOf[DataSourceV2] - source match { -case provider: BatchWriteSupportProvider => - val sessionOptions = DataSourceV2Utils.extractSessionConfigs( -source, -df.sparkSession.sessionState.conf) - val options = sessionOptions ++ extraOptions - +val session = df.sparkSession +val cls = DataSource.lookupDataSource(source, session.sessionState.conf) +if (classOf[TableProvider].isAssignableFrom(cls)) { + val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( +provider, session.sessionState.conf) + val options = sessionOptions ++ extraOptions + val dsOptions = new DataSourceOptions(options.asJava) + provider.getTable(dsOptions) match { +case table: SupportsBatchWrite => + val relation = DataSourceV2Relation.create(table, dsOptions) + // TODO: revisit it. We should not create the `AppendData` operator for `SaveMode.Append`. + // We should create new end-users APIs for the `AppendData` operator. --- End diff -- The example in the referenced comment is this: ``` spark.range(1).format("source").write.save("non-existent-path") ``` If a path for a path-based table doesn't exist, then I think that the table doesn't exist. If a table doesn't exist, then the operation for `save` should be CTAS instead of AppendData. Here, I think the right behavior is to check whether the provider returns a table. If it doesn't, then the table doesn't exist and the plan should be CTAS. If it does, then it must provide the schema used to validate the AppendData operation. Since we don't currently have CTAS, this should throw an exception stating that the table doesn't exist and can't be created. More generally, the meaning of SaveMode with v1 is not always reliable. I think the right approach is what @cloud-fan suggests: create a new write API for v2 tables that is clear for the new logical plans (I've proposed one and would be happy to open a PR). Once the logical plans are in place, we can go back through this API and move it over to v2 where the behaviors match. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239581374 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java --- @@ -25,7 +25,10 @@ * The base interface for v2 data sources which don't have a real catalog. Implementations must * have a public, 0-arg constructor. * - * The major responsibility of this interface is to return a {@link Table} for read/write. + * The major responsibility of this interface is to return a {@link Table} for read/write. If you + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter` + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The + * table schema can be empty in this case. --- End diff -- Maybe it should also be part of the `TableProvider` contract that if the table can't be located, it throws an exception? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239578059 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java --- @@ -25,7 +25,10 @@ * The base interface for v2 data sources which don't have a real catalog. Implementations must * have a public, 0-arg constructor. * - * The major responsibility of this interface is to return a {@link Table} for read/write. + * The major responsibility of this interface is to return a {@link Table} for read/write. If you + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter` + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The + * table schema can be empty in this case. --- End diff -- @jose-torres, create on write is done by CTAS. It should not be left up to the source whether to fail or create. I think the confusion here is that this is a degenerate case where Spark has no ability to interact with the table's metadata. Spark must assume that it exists because the caller is writing to it. The caller is indicating that a table exists, is identified by some configuration, and that a specific implementation can be used to write to it. That's what happens today when source implementations are directly specified. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r239559037 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java --- @@ -25,7 +25,10 @@ * The base interface for v2 data sources which don't have a real catalog. Implementations must * have a public, 0-arg constructor. * - * The major responsibility of this interface is to return a {@link Table} for read/write. + * The major responsibility of this interface is to return a {@link Table} for read/write. If you + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter` + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The + * table schema can be empty in this case. --- End diff -- What does it mean to write to a non-existing table? If you're writing somewhere, the table must exist. This is for creating a table directly from configuration and an implementation class in the DataFrameWriter API. The target of the write still needs to exist. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23055: [SPARK-26080][PYTHON] Skips Python resource limit on Win...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/23055 @HyukjinKwon, for the future, I should note that I'm not a committer so my +1 for a PR is not binding. I'm fairly sure @vanzin would +1 this commit as well, but it's best not to merge based on my approval. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/23208 Thanks for posting this PR @cloud-fan! I'll have a look in the next day or so. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23055: [SPARK-26080][PYTHON] Skips Python resource limit on Win...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/23055 +1 with the latest changes. Thanks for taking care of this, @HyukjinKwon! Functionality is in two parts: changing the resource requests (which doesn't change) and limiting memory use in python. It is too bad that this broke, but I'm not sure how to deal with a platform that, as you say, has few contributors. I certainly wouldn't want to gate a feature like this on making sure someone tested it in Windows, unless we have CI set up for Windows builds. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r238046730 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransforms.java --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +/** + * A standard set of transformations that are passed to data sources during table creation. + * + * @see PartitionTransform + */ +public class PartitionTransforms { + private PartitionTransforms() { + } + + /** + * Create a transform for a column with the given name. + * + * This transform is used to pass named transforms that are not known to Spark. + * + * @param transform a name of the transform to apply to the column + * @param colName a column name + * @return an Apply transform for the column + */ + public static PartitionTransform apply(String transform, String colName) { +if ("identity".equals(transform)) { --- End diff -- I think we should get this done now. Partition transforms are a generalization of Hive partitioning (which uses some columns directly) and bucketing (which is one specific transform). If we add transformation functions now, we will support both of those with a simple API instead of building in special cases for identity and bucket transforms. I also have a data source that allows users to configure partitioning using more transforms than just identity and bucketing, so I'd like to get this in so that DDL for those tables works. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237995405 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; --- End diff -- I just went to make this change, but it requires moving any SQL class from catalyst referenced by the API into the API module as well... Let's discuss the options more on the dev list thread. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r237984050 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public interface TableCatalog extends CatalogProvider { --- End diff -- What about `CatalogPlugin`? I'm hesitant to go with just `Catalog` because it isn't very specific. I think it might cause confusion because the interface has only the `initialize` method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21306 @stczwd, I agree with @mccheah. Tables are basically named data sets. Whether they support batch, micro-batch streaming, or continuous streaming is determined by checking whether they implement SupportsBatchScan or similar interfaces. Matt's referenced docs are the right place to go for more context. The purpose here is to make catalogs and reads orthogonal. A catalog can return both batch-compatible and stream-compatible source "tables". A "table" may be a Kafka topic or may be a file-based data source. And note that both of those can support batch and streaming execution. A Kafka topic could be CDC stream that represents a table, and a file-based source could be streamed by periodically checking for new committed files. This PR is based on an [SPIP](https://docs.google.com/document/d/1zLFiA1VuaWeVxeTDXNg8bL6GP3BVoOZBkewFtEnjEoo/edit#heading=h.7vhjx9226jbt). That has some background for why I chose the set of table attributes here (schema, partitioning, properties), but a short summary is that those are the core set of attributes that are used in comparable SQL variants and already used in Spark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r237975013 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogProvider.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.internal.SQLConf; + +/** + * A marker interface to provide a catalog implementation for Spark. + * + * Implementations can provide catalog functions by implementing additional interfaces, like + * {@link TableCatalog} to expose table operations. + * + * Catalog implementations must implement this marker interface to be loaded by + * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the + * required public no-arg constructor. After creating an instance, it will be configured by calling + * {@link #initialize(CaseInsensitiveStringMap)}. + * + * Catalog implementations are registered to a name by adding a configuration option to Spark: + * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties + * in the Spark configuration that share the catalog name prefix, + * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive + * string map of options in initialization with the prefix removed. An additional property, + * {@code name}, is also added to the options and will contain the catalog's name; in this case, + * "catalog-name". + */ +public interface CatalogProvider { --- End diff -- @cloud-fan, do you want me to create the `sql-api` package in this PR, or do you want to add a separate PR to move the current v2 API? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r237974718 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransforms.java --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +/** + * A standard set of transformations that are passed to data sources during table creation. + * + * @see PartitionTransform + */ +public class PartitionTransforms { + private PartitionTransforms() { + } + + /** + * Create a transform for a column with the given name. + * + * This transform is used to pass named transforms that are not known to Spark. + * + * @param transform a name of the transform to apply to the column + * @param colName a column name + * @return an Apply transform for the column + */ + public static PartitionTransform apply(String transform, String colName) { +if ("identity".equals(transform)) { --- End diff -- I should note that the generic function application will probably look like the `Apply` case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r237974410 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.types.StructType; + +import java.util.List; +import java.util.Map; + +/** + * Represents table metadata from a {@link TableCatalog} or other table sources. --- End diff -- No, this interface carries minimal set of operations needed to implement the v2 logical plans. We can expand it later when we need to. The goal here is to build a replacement catalog API incrementally and to avoid requiring all catalogs to implement all possible catalog features. This API is focused on table operations, not view or function operations that we have yet to define. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r237973548 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1MetadataTable.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2 + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.catalog.v2.PartitionTransforms.{bucket, identity} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.DataSourceReader +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType + +/** + * An implementation of catalog v2 [[Table]] to expose v1 table metadata. + */ +private[sql] class V1MetadataTable( +v1Table: CatalogTable, +v2Source: Option[DataSourceV2]) extends Table { + + def readDelegate: ReadSupport = v2Source match { +case r: ReadSupport => r +case _ => throw new UnsupportedOperationException(s"Source does not support reads: $v2Source") + } + + def writeDelegate: WriteSupport = v2Source match { +case w: WriteSupport => w +case _ => throw new UnsupportedOperationException(s"Source does not support writes: $v2Source") + } + + lazy val options: Map[String, String] = { +v1Table.storage.locationUri match { --- End diff -- I use lazy for a couple reasons. First, to avoid building maps or other data values that are never used. Second, to avoid a required ordering for fields. If fields depend on one another, then they have to be reordered when those dependencies change. Lazy values never require reordering. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r237972742 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1MetadataTable.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2 + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.catalog.v2.PartitionTransforms.{bucket, identity} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.DataSourceReader +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType + +/** + * An implementation of catalog v2 [[Table]] to expose v1 table metadata. + */ +private[sql] class V1MetadataTable( +v1Table: CatalogTable, +v2Source: Option[DataSourceV2]) extends Table { + + def readDelegate: ReadSupport = v2Source match { +case r: ReadSupport => r +case _ => throw new UnsupportedOperationException(s"Source does not support reads: $v2Source") + } + + def writeDelegate: WriteSupport = v2Source match { +case w: WriteSupport => w +case _ => throw new UnsupportedOperationException(s"Source does not support writes: $v2Source") + } + + lazy val options: Map[String, String] = { +v1Table.storage.locationUri match { --- End diff -- How would the `getOrElse` pattern work here? If the URI is undefined, what tuple should be added to the table properties? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r237972182 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public interface TableCatalog extends CatalogProvider { --- End diff -- The intent is to use some interface to load all catalogs, whether they implement `TableCatalog`, `FunctionCatalog`, or both (or other catalog API parts). So you load a catalog, then check whether it is a `TableCatalog` when you want to use it for tables. Sounds like the name `CatalogProvider` is the confusing part. You're right that a provider usually implements a get method to provide something. I could change that to `CatalogImpl` or something. Would that work? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r237971241 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.types.StructType; + +import java.util.List; +import java.util.Map; + +/** + * Represents table metadata from a {@link TableCatalog} or other table sources. + */ +public interface Table { + /** + * Return the table properties. + * @return this table's map of string properties + */ + Map properties(); --- End diff -- Yeah, that works. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r237971288 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.types.StructType; + +import java.util.List; +import java.util.Map; + +/** + * Represents table metadata from a {@link TableCatalog} or other table sources. + */ +public interface Table { + /** + * Return the table properties. + * @return this table's map of string properties + */ + Map properties(); + + /** + * Return the table schema. + * @return this table's schema as a struct type + */ + StructType schema(); + + /** + * Return the table partitioning transforms. + * @return this table's partitioning transforms + */ + List partitioning(); --- End diff -- Sure --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r237971092 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransforms.java --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +/** + * A standard set of transformations that are passed to data sources during table creation. + * + * @see PartitionTransform + */ +public class PartitionTransforms { + private PartitionTransforms() { + } + + /** + * Create a transform for a column with the given name. + * + * This transform is used to pass named transforms that are not known to Spark. + * + * @param transform a name of the transform to apply to the column + * @param colName a column name + * @return an Apply transform for the column + */ + public static PartitionTransform apply(String transform, String colName) { +if ("identity".equals(transform)) { --- End diff -- What I wanted to discuss on Wednesday was how to pass these transforms. @rxin and I had some discussions about it on the dev list, but we didn't come up with a decision. I think the solution will probably be to add way to pass generic function application and a list of arguments that are either columns or constant literals. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23086: [SPARK-25528][SQL] data source v2 API refactor (batch re...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/23086 @cloud-fan, thanks for getting this done! I'll wait for the equivalent write-side PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23086: [SPARK-25528][SQL] data source v2 API refactor (batch re...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/23086 > I still do not think we should mix the catalog support with the data source APIs We are trying to keep these separate. `Table` is the only overlap between the two. If you prefer more separation, we could move the `newScanBuilder` method to a separate interface that readable data sources implement. > Catalog is a well-defined concept in database systems, as what Spark SQL follows. The so-called "table catalog" is not a catalog to me. I'm glad that you're interested in joining the discussion on multi-catalog support. Let's have that discussion on the catalog issues or discussion threads on the dev list, not here on an update to the read API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237966188 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.sources.v2.SupportsBatchRead; +import org.apache.spark.sql.sources.v2.Table; + +/** + * A logical representation of a data source scan. This interface is used to provide logical + * information, like what the actual read schema is. + * + * This logical representation is shared between batch scan, micro-batch streaming scan and + * continuous streaming scan. Data sources must implement the corresponding methods in this + * interface, to match what the table promises to support. For example, {@link #toBatch()} must be + * implemented, if the {@link Table} that creates this {@link Scan} implements + * {@link SupportsBatchRead}. + * + */ +@Evolving +public interface Scan { + + /** + * Returns the actual schema of this data source scan, which may be different from the physical + * schema of the underlying storage, as column pruning or other optimizations may happen. + */ + StructType readSchema(); + + /** + * A description string of this scan, which may includes information like: what filters are + * configured for this scan, what's the value of some important options like path, etc. The + * description doesn't need to include {@link #readSchema()}, as Spark already knows it. + * + * By default this returns the class name of the implementation. Please override it to provide a + * meaningful description. + * + */ + default String description() { --- End diff -- What about adding `pushedFilters` that defaults to `new Filter[0]`? Then users should override that to add filters to the description, if they are pushed. I think a Scan should be able to report its options, especially those that distinguish it from other scans, like pushed filters. I guess we could have some wrapper around the user-provided Scan that holds the Scan options. I would want to standardize that instead of doing it in every scan exec node. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23055: [SPARK-26080][PYTHON] Skips Python resource limit on Win...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/23055 +1 once the docs are updated to note that resource requests still include python memory, even in Windows. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Skips Python resource limit...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23055#discussion_r237963488 --- Diff: docs/configuration.md --- @@ -190,6 +190,8 @@ of the most common options to set are: and it is up to the application to avoid exceeding the overhead memory space shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory is added to executor resource requests. + +NOTE: This configuration is not supported on Windows. --- End diff -- I agree. A better note would be something like "Python memory usage may not be limited on platforms that do not support resource limiting, such as Windows". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21978: [SPARK-25006][SQL] Add CatalogTableIdentifier.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21978 Rebased on master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23086: [SPARK-25528][SQL] data source v2 API refactor (batch re...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/23086 +1 There are only minor suggestions left from me. I'd like to see the default implementation of `Table.name` removed, but I don't think that should block committing this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237670228 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -22,86 +22,56 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} -import org.apache.spark.sql.execution.streaming.continuous._ -import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport, MicroBatchReadSupport} /** - * Physical plan node for scanning data from a data source. + * Physical plan node for scanning a batch of data from a data source. */ case class DataSourceV2ScanExec( output: Seq[AttributeReference], -@transient source: DataSourceV2, -@transient options: Map[String, String], -@transient pushedFilters: Seq[Expression], -@transient readSupport: ReadSupport, -@transient scanConfig: ScanConfig) - extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { +scanDesc: String, +@transient batch: Batch) --- End diff -- Sounds good to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237670099 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.sources.v2.SupportsBatchRead; +import org.apache.spark.sql.sources.v2.Table; + +/** + * A logical representation of a data source scan. This interface is used to provide logical + * information, like what the actual read schema is. + * + * This logical representation is shared between batch scan, micro-batch streaming scan and + * continuous streaming scan. Data sources must implement the corresponding methods in this + * interface, to match what the table promises to support. For example, {@link #toBatch()} must be + * implemented, if the {@link Table} that creates this {@link Scan} implements + * {@link SupportsBatchRead}. + * + */ +@Evolving +public interface Scan { + + /** + * Returns the actual schema of this data source scan, which may be different from the physical + * schema of the underlying storage, as column pruning or other optimizations may happen. + */ + StructType readSchema(); + + /** + * A description string of this scan, which may includes information like: what filters are + * configured for this scan, what's the value of some important options like path, etc. The + * description doesn't need to include {@link #readSchema()}, as Spark already knows it. + * + * By default this returns the class name of the implementation. Please override it to provide a + * meaningful description. + * + */ + default String description() { --- End diff -- I would have expected the default implementation to show both pushed filters and the read schema, along with the implementation class name. Read schema can be accessed by `readSchema`. Should there also be a way to access the pushed filters? `pushedFilters` seems like a good idea to me. (This can be added later) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237668483 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.v2.reader.Scan; +import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a logical structured data set of a data source. For example, the + * implementation can be a directory on the file system, a topic of Kafka, or a table in the + * catalog, etc. + * + * This interface can mixin the following interfaces to support different operations: + * + * + * {@link SupportsBatchRead}: this table can be read in batch queries. + * + */ +@Evolving +public interface Table { + + /** + * A name to identify this table. + * + * By default this returns the class name of this implementation. Please override it to provide a + * meaningful name, like the database and table name from catalog, or the location of files for + * this table. + * + */ + default String name() { --- End diff -- I don't think this should have a default. Implementations should definitely implement this. I think there is a difference between `toString` and `name`. An implementation may choose to display `name` when showing a table's string representation, but may choose to include extra information to show more about the table state, like Iceberg's snapshot ID. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21978: [SPARK-25006][SQL] Add CatalogTableIdentifier.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21978#discussion_r237660050 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala --- @@ -18,48 +18,106 @@ package org.apache.spark.sql.catalyst /** - * An identifier that optionally specifies a database. + * An identifier that optionally specifies a database and catalog. * * Format (unquoted): "name" or "db.name" * Format (quoted): "`name`" or "`db`.`name`" */ -sealed trait IdentifierWithDatabase { +sealed trait IdentifierWithOptionalDatabaseAndCatalog { val identifier: String def database: Option[String] + def catalog: Option[String] + /* * Escapes back-ticks within the identifier name with double-back-ticks. */ private def quoteIdentifier(name: String): String = name.replace("`", "``") def quotedString: String = { -val replacedId = quoteIdentifier(identifier) -val replacedDb = database.map(quoteIdentifier(_)) - -if (replacedDb.isDefined) s"`${replacedDb.get}`.`$replacedId`" else s"`$replacedId`" +// database is required if catalog is present +assert(database.isDefined || catalog.isEmpty) +def q(s: String): String = s"`${quoteIdentifier(s)}`" +Seq(catalog.map(q), database.map(q), Some(q(identifier))).flatten.mkString(".") } def unquotedString: String = { -if (database.isDefined) s"${database.get}.$identifier" else identifier +Seq(catalog, database, Some(identifier)).flatten.mkString(".") } override def toString: String = quotedString } +object CatalogTableIdentifier { + def apply(table: String): CatalogTableIdentifier = +new CatalogTableIdentifier(table, None, None) + + def apply(table: String, database: String): CatalogTableIdentifier = +new CatalogTableIdentifier(table, Some(database), None) + + def apply(table: String, database: String, catalog: String): CatalogTableIdentifier = +new CatalogTableIdentifier(table, Some(database), Some(catalog)) +} + /** - * Identifies a table in a database. - * If `database` is not defined, the current database is used. - * When we register a permanent function in the FunctionRegistry, we use - * unquotedString as the function name. + * Identifies a table in a database and catalog. + * If `database` is not defined, the current catalog's default database is used. + * If `catalog` is not defined, the current catalog is used. --- End diff -- Agreed. This introduces the ability to expose a catalog to Spark. It doesn't actually add any user-facing operations. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21978: [SPARK-25006][SQL] Add CatalogTableIdentifier.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21978#discussion_r237585203 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala --- @@ -18,48 +18,106 @@ package org.apache.spark.sql.catalyst /** - * An identifier that optionally specifies a database. + * An identifier that optionally specifies a database and catalog. * * Format (unquoted): "name" or "db.name" * Format (quoted): "`name`" or "`db`.`name`" */ -sealed trait IdentifierWithDatabase { +sealed trait IdentifierWithOptionalDatabaseAndCatalog { val identifier: String def database: Option[String] + def catalog: Option[String] + /* * Escapes back-ticks within the identifier name with double-back-ticks. */ private def quoteIdentifier(name: String): String = name.replace("`", "``") def quotedString: String = { -val replacedId = quoteIdentifier(identifier) -val replacedDb = database.map(quoteIdentifier(_)) - -if (replacedDb.isDefined) s"`${replacedDb.get}`.`$replacedId`" else s"`$replacedId`" +// database is required if catalog is present +assert(database.isDefined || catalog.isEmpty) +def q(s: String): String = s"`${quoteIdentifier(s)}`" +Seq(catalog.map(q), database.map(q), Some(q(identifier))).flatten.mkString(".") } def unquotedString: String = { -if (database.isDefined) s"${database.get}.$identifier" else identifier +Seq(catalog, database, Some(identifier)).flatten.mkString(".") } override def toString: String = quotedString } +object CatalogTableIdentifier { + def apply(table: String): CatalogTableIdentifier = +new CatalogTableIdentifier(table, None, None) + + def apply(table: String, database: String): CatalogTableIdentifier = +new CatalogTableIdentifier(table, Some(database), None) + + def apply(table: String, database: String, catalog: String): CatalogTableIdentifier = +new CatalogTableIdentifier(table, Some(database), Some(catalog)) +} + /** - * Identifies a table in a database. - * If `database` is not defined, the current database is used. - * When we register a permanent function in the FunctionRegistry, we use - * unquotedString as the function name. + * Identifies a table in a database and catalog. + * If `database` is not defined, the current catalog's default database is used. + * If `catalog` is not defined, the current catalog is used. --- End diff -- No, we want to move away from a special global catalog. I think that Spark should have a current catalog, like a current database, which is used to resolve references that don't have an explicit catalog. That would have a default, just like the current database has a default. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21306 @stczwd, thanks for taking a look at this. What are the differences between batch and stream DDL that you think will come up? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237179854 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -54,27 +53,17 @@ case class DataSourceV2ScanExec( Seq(output, source, options).hashCode() } - override def outputPartitioning: physical.Partitioning = readSupport match { + override def outputPartitioning: physical.Partitioning = scan match { --- End diff -- If you take my suggesting above to inspect the `Scan` to build the string representation of this node, then I think the arguments should be `scan` and `output`. Then the batch can be fetched here. For pushedFilters, I think that they should be fetched from the configured scan to build the string representation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237178976 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -23,29 +23,28 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} -import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport, MicroBatchReadSupport} /** - * Physical plan node for scanning data from a data source. + * Physical plan node for scanning a batch of data from a data source. */ case class DataSourceV2ScanExec( output: Seq[AttributeReference], @transient source: DataSourceV2, @transient options: Map[String, String], --- End diff -- With a catalog, there is no expectation that a `source` will be passed. This could be a string that identifies either the source or the catalog, for a good string representation of the physical plan. This is another area where I think `Table.name` would be helpful because the table's identifying information is really what should be shown instead of its source or catalog. For options, these are part of the scan and aren't used to affect the behavior of this physical node. I think that means that they shouldn't be part of the node's arguments. I think a good way to solve this problem is to change the pretty string format to use `Scan` instead. That has the information that defines what this node is doing, like the filters, projection, and options. And being able to convert a logical scan to text would be useful across all 3 execution modes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237176552 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils -import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, DataSourceOptions, DataSourceV2} +import org.apache.spark.sql.sources.v2._ --- End diff -- I am using an IDE for this review, but this makes future reviews harder. I realize it isn't a major issue, but I think it is a best practice to not use wildcard imports. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237176100 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for v2 data sources which don't have a real catalog. Implementations must + * have a public, 0-arg constructor. + * + * The major responsibility of this interface is to return a {@link Table} for read/write. + */ +@Evolving +// TODO: do not extend `DataSourceV2`, after we finish the API refactor completely. +public interface TableProvider extends DataSourceV2 { + + /** + * Return a {@link Table} instance to do read/write with user-specified options. + * + * @param options the user-specified options that can identify a table, e.g. file path, Kafka + *topic name, etc. It's an immutable case-insensitive string-to-string map. + */ + Table getTable(DataSourceOptions options); + + /** + * Return a {@link Table} instance to do read/write with user-specified schema and options. + * + * By default this method throws {@link UnsupportedOperationException}, implementations should --- End diff -- Strange, that page links to one with the opposite advice: http://www.javapractices.com/topic/TopicAction.do?Id=44 I think that `@throws` is a good idea whenever you want to document an exception type as part of the method contract. Since it is expected that this method isn't always implemented and may throw this exception, I think you were right to document it. And documenting exceptions is best done with `@throws` to highlight them in Javadoc. The page you linked to makes the argument that unchecked exceptions aren't part of the method contract and cannot be relied on. But documenting this shows that it is part of the contract or expected behavior, so I think docs are appropriate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237172065 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; --- End diff -- > Can we delay the discussion when we have a PR to add catalog support after the refactor? Yes, that works. But, can we move `Table` to the `org.apache.spark.sql.catalog.v2` package where `TableCatalog` is defined in the other PR? I think `Table` should be defined with the catalog API and moving that later would require import changes to any file that references `Table`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23055#discussion_r237169532 --- Diff: python/pyspark/worker.py --- @@ -22,7 +22,12 @@ import os import sys import time -import resource +# 'resource' is a Unix specific module. +has_resource_module = True +try: +import resource +except ImportError: +has_resource_module = False --- End diff -- I'm -1 on this change. I think the correct behavior is that Python should not fail if resource cannot be imported, and the JVM should not do anything differently. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23086: [SPARK-25528][SQL] data source v2 API refactor (batch re...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/23086 @cloud-fan, sorry to spread review comments over two days, but I've finished the first pass. Overall, it looks great. I think we can simplify a couple of areas, like all of the args passed to the ScanExec node and its equals method. I'd also like to add `name` to table to return an identifying string (even if that is a set of options or paths in some cases). Thanks for working on this! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236859358 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala --- @@ -396,87 +392,66 @@ object SimpleReaderFactory extends PartitionReaderFactory { } } -abstract class SimpleReadSupport extends BatchReadSupport { - override def fullSchema(): StructType = new StructType().add("i", "int").add("j", "int") - - override def newScanConfigBuilder(): ScanConfigBuilder = { -NoopScanConfigBuilder(fullSchema()) - } +abstract class SimpleBatchTable extends Table with SupportsBatchRead { - override def createReaderFactory(config: ScanConfig): PartitionReaderFactory = { -SimpleReaderFactory - } + override def schema(): StructType = new StructType().add("i", "int").add("j", "int") } +abstract class SimpleScanBuilder extends ScanBuilder + with Batch with Scan { --- End diff -- I like that the API is flexible enough that `ScanBuilder`, `Scan`, and `Batch` can be the same object in simple cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236858793 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala --- @@ -116,16 +116,20 @@ object DataSourceV2Strategy extends Strategy { |Output: ${output.mkString(", ")} """.stripMargin) - val scan = DataSourceV2ScanExec( + val batch = scan.toBatch + val partitions = batch.planInputPartitions() + val readerFactory = batch.createReaderFactory() + val plan = DataSourceV2ScanExec( --- End diff -- I mentioned this above, but I think that DataSoruceV2ScanExec only needs to be passed `output` and `batch`. That is, unless there is a benefit to calling `planInputPartitions` here, like an earlier failure? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236858449 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -54,27 +53,17 @@ case class DataSourceV2ScanExec( Seq(output, source, options).hashCode() } - override def outputPartitioning: physical.Partitioning = readSupport match { + override def outputPartitioning: physical.Partitioning = scan match { --- End diff -- Should `SupportsReportPartitioning` extend `Batch` instead of `Scan`? Then this physical node could just be passed the `Batch` and not the `Scan`, `PartitionReaderFactory`, and partitions. In fact, I think that this node only requires `output: Seq[AttributeReference], batch: Batch`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236858107 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -23,29 +23,28 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} -import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport, MicroBatchReadSupport} /** - * Physical plan node for scanning data from a data source. + * Physical plan node for scanning a batch of data from a data source. */ case class DataSourceV2ScanExec( output: Seq[AttributeReference], @transient source: DataSourceV2, @transient options: Map[String, String], --- End diff -- Similarly, options were used to create the `Scan` so they don't need to be passed here if they are not used in `equals` and `hashCode`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236857220 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -23,29 +23,28 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} -import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport, MicroBatchReadSupport} /** - * Physical plan node for scanning data from a data source. + * Physical plan node for scanning a batch of data from a data source. */ case class DataSourceV2ScanExec( output: Seq[AttributeReference], @transient source: DataSourceV2, --- End diff -- I think we can remove source by updating `equals` and `hashCode` to check just the `Scan`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236856960 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -23,29 +23,28 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} -import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport, MicroBatchReadSupport} /** - * Physical plan node for scanning data from a data source. + * Physical plan node for scanning a batch of data from a data source. */ case class DataSourceV2ScanExec( output: Seq[AttributeReference], @transient source: DataSourceV2, @transient options: Map[String, String], @transient pushedFilters: Seq[Expression], -@transient readSupport: ReadSupport, -@transient scanConfig: ScanConfig) +@transient scan: Scan, +@transient partitions: Array[InputPartition], +@transient readerFactory: PartitionReaderFactory) extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { override def simpleString: String = "ScanV2 " + metadataString // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { -case other: DataSourceV2ScanExec => - output == other.output && readSupport.getClass == other.readSupport.getClass && +case other: DataSourceV2StreamingScanExec => + output == other.output && source.getClass == other.source.getClass && --- End diff -- Should this implement identity instead of equality? When would two ScanExec nodes be equal instead of identical? Also, I don't think that this equals implementation is correct. First, it should not check for the streaming class. Second, it should check whether the scan is equal, not whether the options and the source are the same (plus, source will be removed). Unfortunately, implementing true equality (not just identity) must in some way rely on a user-supplied class. A scan is the same if it will produce the same set of rows and columns in those rows. That means equality depends on the filter, projection, and source data (i.e. table). We can use `pushedFilters` and `output` for the filter and projection. But checking that the source data is the same requires using either the scan's `equals` method (which would also satisfy the filter and projection checks) or checking that the partitions are the same. Both `Scan` and `InputPartition` implementations are provided by sources, so their `equals` methods may not be implemented. Because this must depend on checking equality of user-supplied objects, I think it would be much easier to make this depend only on equality of the `Scan`: ``` override def equals(other: Any): Boolean = other match { case scanExec: DataSourceV2ScanExec => scanExec.scan == this.scan } ``` That may fall back to identity if the user hasn't supplied an equals method, but I don't see a way to avoid it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236852153 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -170,15 +157,24 @@ object DataSourceV2Relation { } def create( - source: DataSourceV2, + provider: TableProvider, + table: SupportsBatchRead, options: Map[String, String], tableIdent: Option[TableIdentifier] = None, userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = { -val readSupport = source.createReadSupport(options, userSpecifiedSchema) -val output = readSupport.fullSchema().toAttributes +val output = table.schema().toAttributes val ident = tableIdent.orElse(tableFromOptions(options)) DataSourceV2Relation( - source, readSupport, output, options, ident, userSpecifiedSchema) + provider, table, output, options, ident, userSpecifiedSchema) + } + + def createRelationForWrite( --- End diff -- Also note that this is temporary until the write side is finished? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236850263 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.v2.reader.Scan; +import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a logical structured data set of a data source. For example, the + * implementation can be a directory on the file system, a topic of Kafka, or a table in the + * catalog, etc. + * + * This interface can mixin the following interfaces to support different operations: + * + * {@link SupportsBatchRead}: this table can be read in batch queries. + * + */ +@Evolving +public interface Table { --- End diff -- It would be helpful for a `Table` to also expose a name or identifier of some kind. The `TableIdentifier` passed into `DataSourceV2Relation` is only used in `name` to identify the relation's table. If the name (or location for path-based tables) were supplied by the table instead, it would remove the need to pass it in the relation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236849290 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -40,8 +40,8 @@ import org.apache.spark.sql.types.StructType * @param userSpecifiedSchema The user-specified schema for this scan. */ case class DataSourceV2Relation( -source: DataSourceV2, -readSupport: BatchReadSupport, +source: TableProvider, --- End diff -- May want to note that TableProvider will be removed when the write side is finished, since it is only used for `createWriteSupport`, which will be exposed through `Table`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236844174 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils -import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, DataSourceOptions, DataSourceV2} +import org.apache.spark.sql.sources.v2._ --- End diff -- Nit: using wildcard imports makes it harder to review without an IDE because it is more difficult to find out where symbols come from. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236823417 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.sources.v2.SupportsBatchRead; +import org.apache.spark.sql.sources.v2.Table; + +/** + * A logical representation of a data source scan. This interface is used to provide logical + * information, like what the actual read schema is. + * + * This logical representation is shared between batch scan, micro-batch streaming scan and + * continuous streaming scan. Data sources must implement the corresponding methods in this + * interface, to match what the table promises to support. For example, {@link #toBatch()} must be + * implemented, if the {@link Table} that creates this {@link Scan} implements + * {@link SupportsBatchRead}. + */ +@Evolving +public interface Scan { + + /** + * Returns the actual schema of this data source scan, which may be different from the physical + * schema of the underlying storage, as column pruning or other optimizations may happen. + */ + StructType readSchema(); + + /** + * Returns the physical representation of this scan for batch query. By default this method throws + * exception, data sources must overwrite this method to provide an implementation, if the + * {@link Table} that creates this scan implements {@link SupportsBatchRead}. + */ + default Batch toBatch() { +throw new UnsupportedOperationException("Do not support batch scan."); --- End diff -- Nit: text should be "Batch scans are not supported". Starting with "Do not" makes the sentence a command. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236820896 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.Evolving; + +/** + * A physical representation of a data source scan for batch queries. This interface is used to + * provide physical information, like how many partitions the scanned data has, and how to read + * records from the partitions. + */ +@Evolving +public interface Batch { + + /** + * Returns a list of {@link InputPartition input partitions}. Each {@link InputPartition} + * represents a data split that can be processed by one Spark task. The number of input + * partitions returned here is the same as the number of RDD partitions this scan outputs. + * + * Note that, this may not be a full scan if the data source supports optimization like filter + * push-down. Implementations should check the status of {@link Scan} that creates this batch, + * and adjust the resulting {@link InputPartition input partitions}. --- End diff -- I think this is a little unclear. Implementations do not necessarily check the scan. This Batch is likely configured with a filter and is responsible for creating splits for that filter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236820065 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for v2 data sources which don't have a real catalog. Implementations must + * have a public, 0-arg constructor. + * + * The major responsibility of this interface is to return a {@link Table} for read/write. + */ +@Evolving +// TODO: do not extend `DataSourceV2`, after we finish the API refactor completely. +public interface TableProvider extends DataSourceV2 { + + /** + * Return a {@link Table} instance to do read/write with user-specified options. + * + * @param options the user-specified options that can identify a table, e.g. file path, Kafka + *topic name, etc. It's an immutable case-insensitive string-to-string map. + */ + Table getTable(DataSourceOptions options); + + /** + * Return a {@link Table} instance to do read/write with user-specified schema and options. + * --- End diff -- Minor: Javadoc doesn't automatically parse empty lines as new paragraphs. If you want to have one in documentation, then use ``. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236819758 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for v2 data sources which don't have a real catalog. Implementations must + * have a public, 0-arg constructor. + * + * The major responsibility of this interface is to return a {@link Table} for read/write. + */ +@Evolving +// TODO: do not extend `DataSourceV2`, after we finish the API refactor completely. +public interface TableProvider extends DataSourceV2 { + + /** + * Return a {@link Table} instance to do read/write with user-specified options. + * + * @param options the user-specified options that can identify a table, e.g. file path, Kafka + *topic name, etc. It's an immutable case-insensitive string-to-string map. + */ + Table getTable(DataSourceOptions options); + + /** + * Return a {@link Table} instance to do read/write with user-specified schema and options. + * + * By default this method throws {@link UnsupportedOperationException}, implementations should --- End diff -- Javadoc would normally also add `@throws` with this information. I agree it should be here as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236818511 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for v2 data sources which don't have a real catalog. Implementations must + * have a public, 0-arg constructor. + * + * The major responsibility of this interface is to return a {@link Table} for read/write. + */ +@InterfaceStability.Evolving +// TODO: do not extend `DataSourceV2`, after we finish the API refactor completely. +public interface TableProvider extends DataSourceV2 { + + /** + * Return a {@link Table} instance to do read/write with user-specified options. + * + * @param options the user-specified options that can identify a table, e.g. file path, Kafka + *topic name, etc. It's an immutable case-insensitive string-to-string map. + */ + Table getTable(DataSourceOptions options); + + /** + * Return a {@link Table} instance to do read/write with user-specified schema and options. + * + * By default this method throws {@link UnsupportedOperationException}, implementations should + * override this method to handle user-specified schema. + * + * @param options the user-specified options that can identify a table, e.g. file path, Kafka + *topic name, etc. It's an immutable case-insensitive string-to-string map. + * @param schema the user-specified schema. + */ + default Table getTable(DataSourceOptions options, StructType schema) { --- End diff -- I agree with @cloud-fan. These are slightly different uses. Here, it is supplying a schema for how to interpret data files. Say you have CSV files with columns `id`, `ts`, and `data` and no headers. This tells the CSV reader what the columns are and how to convert the data to useful types (bigint, timestamp, and string). Column projection will later request those columns, maybe just `id` and `data`. If you only passed the projection schema, then the `ts` values would be returned for the `data` column. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236816739 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; --- End diff -- I can understand wanting to keep everything in Catalyst private. That's fine with me, but I think that Catalyst does need to be able to interact with tables and catalogs that are supplied by users. For example: Our tables support schema evolution. Specifically, reading files that were written before a column was added. When we add a column, Spark shouldn't start failing in analysis for an AppendData operation in a scheduled job (as it would today). We need to be able to signal to the validation rule that the table supports reading files that are missing columns, so that Spark can do the right validation and allow writes that used to work to continue. How would that information -- support for reading missing columns -- be communicated to the analyzer? Also, what about my example above: how will the analyzer load tables using a user-supplied catalog if catalyst can't use any user-supplied implementations? We could move all of the v2 analysis rules, like ResolveRelations, into the core module, but it seems to me that this requirement is no longer providing value if we have to do that. I think that catalyst is the right place for common plans and analysis rules to live because it is the library of common SQL components. Wherever the rules and plans end up, they will need to access to the `TableCatalog` API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236796331 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.v2.reader.Scan; +import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a logical structured data set of a data source. For example, the + * implementation can be a directory on the file system, a topic of Kafka, or a table in the + * catalog, etc. + * + * This interface can mixin the following interfaces to support different operations: + * + * {@link SupportsBatchRead}: this table can be read in batch queries. + * + */ +@Evolving +public interface Table { + + /** + * Returns the schema of this table. + */ + StructType schema(); + + /** + * Returns a {@link ScanBuilder} which can be used to build a {@link Scan} later. Spark will call + * this method for each data scanning query. + * + * The builder can take some query specific information to do operators pushdown, and keep these + * information in the created {@link Scan}. + */ + ScanBuilder newScanBuilder(DataSourceOptions options); --- End diff -- Either in a follow-up or you can add the class in this PR. Either way works for me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236491385 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.v2.reader.Scan; +import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a logical structured data set of a data source. For example, the + * implementation can be a directory on the file system, a topic of Kafka, or a table in the + * catalog, etc. + * + * This interface can mixin the following interfaces to support different operations: + * + * {@link SupportsBatchRead}: this table can be read in batch queries. + * + */ +@Evolving +public interface Table { + + /** + * Returns the schema of this table. + */ + StructType schema(); + + /** + * Returns a {@link ScanBuilder} which can be used to build a {@link Scan} later. Spark will call + * this method for each data scanning query. + * + * The builder can take some query specific information to do operators pushdown, and keep these + * information in the created {@link Scan}. + */ + ScanBuilder newScanBuilder(DataSourceOptions options); --- End diff -- `DataSourceOptions` isn't simply a map for two main reasons that I can tell: first, it forces options to be case insensitive, and second, it exposes helper methods to identify tables, like `tableName`, `databaseName`, and `paths`. In the new abstraction, the second use of `DataSourceOptions` is no longer needed. The table is already instantiated by the time that this is called. We should to reconsider `DataSourceOptions`. The `tableName` methods aren't needed and we also no longer need to forward properties from the session config because the way tables are configured has changed (catalogs handle that). I think we should remove this class and instead use the more direct implementation, `CaseInsensitiveStringMap` from #21306. The behavior of that class is obvious from its name and it would be shared between the v2 APIs, both catalog and data source. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236487464 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; --- End diff -- #21306 (TableCatalog support) adds this class as `org.apache.spark.sql.catalog.v2.Table` in the `spark-catalyst` module. I think it needs to be in the catalyst module and should probably be in the `o.a.s.sql.catalog.v2` package as well. The important one is moving this to the catalyst module. The analyzer is in catalyst and all of the v2 logical plans and analysis rules will be in catalyst as well, because we are standardizing behavior. The standard validation rules should be in catalyst, not in a source-specific or hive-specific package in the sql-core or hive modules. Because the logical plans and validation rules are in the catalyst package, the `TableCatalog` API needs to be there as well. For example, when a [catalog table identifier](https://github.com/apache/spark/pull/21978) is resolved for a read query, one of the results is a `TableCatalog` instance for the catalog portion of the identifier. That catalog is used to load the v2 table, which is then wrapped in a v2 relation for further analysis. Similarly, the write path should also validate that the catalog exists during analysis by loading it, and would then pass the catalog in a v2 logical plan for `CreateTable` or `CreateTableAsSelect`. I also think that it makes sense to use the `org.apache.spark.sql.catalog.v2` package for `Table` because `Table` is more closely tied to the `TableCatalog` API than to the data source API. The link to DSv2 is that `Table` carries `newScanBuilder`, but the rest of the methods exposed by `Table` are for catalog functions, like inspecting a table's partitioning or table properties. Moving this class would make adding `TableCatalog` less intrusive. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23055#discussion_r236480711 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) // each python worker gets an equal part of the allocation. the worker pool will grow to the // number of concurrent tasks, which is determined by the number of cores in this executor. - private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) + private val memoryMb = if (Utils.isWindows) { --- End diff -- Documentation should be "For platforms where the `resource` API is available, python will limit its resource usage". The allocation on the JVM side is still the correct behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23055#discussion_r236345625 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) // each python worker gets an equal part of the allocation. the worker pool will grow to the // number of concurrent tasks, which is determined by the number of cores in this executor. - private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) + private val memoryMb = if (Utils.isWindows) { --- End diff -- > functionality is disabled in Python side The only functionality that is disabled is limiting the memory space. The allocation for Python is still requested from resource managers. Setting the environment property tells python how much memory it was allocated, no matter how that is used or enforced. > code consistency - usually the configuration is dealt with in JVM side if possible The JVM is handling the setting by requesting that memory for python and passing on the amount requested to python. The fact that the python process can't limit doesn't affect how the JVM side should behave. This needlessly couples JVM and python behavior with an assumption that may not be true in the future. > Why are you so against about disabling it in JVM side? There is no benefit to disabling this. It is more code with no purpose and it makes assumptions about what python can or cannot do that aren't obvious. What if pandas implements some method to spill to disk to limit memory consumption? Will implementers of that future feature know that the environment variable is not set when running in windows? This adds complexity for no benefit because it doesn't change either the resource allocation in the JVM or the behavior of the python process. It only avoids sending valuable information. I see no reason for it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23055#discussion_r235082191 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) // each python worker gets an equal part of the allocation. the worker pool will grow to the // number of concurrent tasks, which is determined by the number of cores in this executor. - private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) + private val memoryMb = if (Utils.isWindows) { --- End diff -- There is no configuration to change needed on the JVM side. The JVM should communicate to Python how much memory it is allocated. If Python can limit itself to that amount, then that's fine. If the JVM doesn't expect Python to be able to limit, why would it not tell Python how much memory it was allocated? There is no benefit to making this change that I can see. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23055#discussion_r234691652 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) // each python worker gets an equal part of the allocation. the worker pool will grow to the // number of concurrent tasks, which is determined by the number of cores in this executor. - private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) + private val memoryMb = if (Utils.isWindows) { --- End diff -- @HyukjinKwon, what should the JVM side do differently if `resource` is not available? I don't think it should do anything different. It should still allocate the python memory region when requesting resources from schedulers. The only difference is that python isn't self-limiting. Do you have an example of something that the JVM should change when running on Windows? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/22547 I agree that there is consensus for the proposal in the design doc and I don't think there are any blockers. If there's something I can do to help, please let me know. Otherwise ping me to review! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23055#discussion_r234286173 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) // each python worker gets an equal part of the allocation. the worker pool will grow to the // number of concurrent tasks, which is determined by the number of cores in this executor. - private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) + private val memoryMb = if (Utils.isWindows) { --- End diff -- My point is that if resource can't be loaded for any reason, the code shouldn't fail. As it is, if resource can't be loaded then that is handled, but if the memory limit is set then the worker will still try to use it. That's what I think is brittle. There should be a flag for whether to attempt to use the resource API, based on whether it was loaded. If the worker operates as I described, then why make any changes on the JVM side? Why avoid telling the worker how much memory it has? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23055#discussion_r234084002 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) // each python worker gets an equal part of the allocation. the worker pool will grow to the // number of concurrent tasks, which is determined by the number of cores in this executor. - private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) + private val memoryMb = if (Utils.isWindows) { --- End diff -- I mean that it is brittle to try to use `resource` if the JVM has set the property. You handle the `ImportError`, but the JVM could set the request and Python would break again. I think that this should not be entirely disabled on Windows. Resource requests to YARN or other schedulers should include this memory. The only feature that should be disabled is the resource limiting on the python side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pyspark.me...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/23055 Thanks for fixing this so quickly, @HyukjinKwon! I'd like a couple of changes, but overall it is going in the right direction. We should also plan on porting this to the 2.4 branch when it is committed since it is a regression. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23055#discussion_r234080578 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -74,8 +74,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) // each python worker gets an equal part of the allocation. the worker pool will grow to the // number of concurrent tasks, which is determined by the number of cores in this executor. - private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) + private val memoryMb = if (Utils.isWindows) { --- End diff -- I don't think this is necessary. If `resource` can't be imported for any reason, then memory will not be limited in python. But the JVM side shouldn't be what determines whether that happens. The JVM should do everything the same way -- even requesting memory from schedulers like YARN because that space should still be allocated as python memory, even if python can't self-limit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/23055#discussion_r234080290 --- Diff: python/pyspark/worker.py --- @@ -268,9 +272,11 @@ def main(infile, outfile): # set up memory limits memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1")) -total_memory = resource.RLIMIT_AS -try: -if memory_limit_mb > 0: +# 'PYSPARK_EXECUTOR_MEMORY_MB' should be undefined on Windows because it depends on +# resource package which is a Unix specific package. +if memory_limit_mb > 0: --- End diff -- It seems brittle to disable this on the JVM side and rely on it here. Can we also set a flag in the ImportError case and also check that here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r231707076 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.types.DataType; + +/** + * TableChange subclasses represent requested changes to a table. These are passed to + * {@link TableCatalog#alterTable}. For example, + * + * import TableChange._ + * val catalog = source.asInstanceOf[TableSupport].catalog() + * catalog.alterTable(ident, + * addColumn("x", IntegerType), + * renameColumn("a", "b"), + * deleteColumn("c") + * ) + * + */ +public interface TableChange { --- End diff -- @mccheah, our table format supports updating the partitioning of a table, so I think it should be supported. But, this is intended to be an initial API so I didn't want to block this on agreeing how to repartition a table. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r231706583 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.types.StructType; + +import java.util.List; +import java.util.Map; + +/** + * Represents table metadata from a {@link TableCatalog} or other table sources. + */ +public interface Table { --- End diff -- I think the two `Table` classes are trying to be the same thing. This is one of the reasons why I brought it up in the sync. @cloud-fan's current PR isn't yet based on this work, so it doesn't get the abstraction right. What you linked to uses `Table` to expose `newScanConfigBuilder`, basically requiring that all tables are readable. Instead, the implementation classes in #22547 should be interfaces that extend this `Table` to make it readable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r230528510 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala --- @@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType * scenarios, where some offsets after the specified initial ones can't be * properly read. */ -class KafkaContinuousReadSupport( +class KafkaContinuousInputStream( --- End diff -- I'd prefer that the commits themselves compile, but since this is separating the modes I think it could be done incrementally. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21306 @felixcheung, we're waiting on more reviews and a community decision about how to pass partition transforms. For passing transforms, I think the most reasonable compromise is to go with a generic function application, so each transform would be passed as a function/transform name with one or more arguments, where each argument is either a column reference (by name) or a literal value. That's a fairly small public API addition but it supports a lot of different partitioning schemes to be expressed, including the one above for Kudu. We already have all of this implemented based on the current PR, but I can update this in the next week or so. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/22547 @jose-torres, I don't mean that the primary purpose of the v2 API is for catalog integration, I mean that the primary use of v2 is with tables that are stored in some catalog. So we should make sure that the plan and design work well with catalog tables. Another reason that catalog tables are important is that the v2 plans require a catalog for consistent behavior. So catalogs are important and I think will affect the implementation details. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226798538 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Format.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for data source v2. Implementations must have a public, 0-arg constructor. + * + * The major responsibility of this interface is to return a {@link Table} for read/write. + */ +@InterfaceStability.Evolving +public interface Format extends DataSourceV2 { + + /** + * Return a {@link Table} instance to do read/write with user-specified options. + * + * @param options the user-specified options that can identify a table, e.g. path, table name, --- End diff -- Why is it necessary to pass table name and database to Format? Format should only be used in 2 places to create tables. First, in the DataFrameReader (or writer) API when a format is specified directly instead of a catalog/database/table or catalog/path. Second, it would be used in catalogs that support pluggable implementations, like the current session catalog, which needs to dynamically instantiate implementations based on the table's provider. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226798213 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Format.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for data source v2. Implementations must have a public, 0-arg constructor. + * + * The major responsibility of this interface is to return a {@link Table} for read/write. + */ +@InterfaceStability.Evolving +public interface Format extends DataSourceV2 { --- End diff -- Why is there both Format and DataSourceV2? What does DataSourceV2 do? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/22547 After looking at the changes, I want to reiterate that request for a design doc. I think that code is a great way to prototype a design, but that we need to step back and make sure that the design makes sense when you view it from a high level. I have two main motivations for that point. First, there are some classes that I don't see a justification for, like having a separate ScanConfig, BatchScan, and PartitionReaderFactory. Are all of those separate classes necessary? Can a ScanConfigBuilder return a BatchScan? Can BatchScan expose a createBatchReader(InputPartition) method? My second motivation for saying we need a clear design doc is that I think that the current way to interact with v2 doesn't fit well with catalogs. This is based around Format, which is based on the v1 method of loading read and write implementations. But that isn't the primary way that v2 will be used be used. It happens to be the only way to call into the v2 API from Spark today, but the primary use of v2 is to integrate sources that are actually modeled as tables in some catalog. For example, Format exposes getTable that returns a Table implementation from DataSourceOptions. Those options have tableName and databaseName methods. But tables that are identified by name shouldn't be loaded by a Format, they should be loaded by a catalog. It also uses the options for both table options and read options because there isn't a way to pass both. But most tables will be created with table options by a catalog and will accept read-specific options passed to the DataFrameReader. I think we would approach a usable API much sooner if this work was planned based on a shared understanding of how catalogs and tables will interact in the future. Not having a catalog API right now is affecting the way tables work in this PR, and that's a concern for me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226796934 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -173,12 +185,17 @@ object DataSourceV2Relation { source: DataSourceV2, options: Map[String, String], tableIdent: Option[TableIdentifier] = None, - userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = { -val readSupport = source.createReadSupport(options, userSpecifiedSchema) -val output = readSupport.fullSchema().toAttributes + userSpecifiedSchema: Option[StructType] = None): Option[DataSourceV2Relation] = { --- End diff -- This shouldn't return an option. A relation is not a read-side structure, it is also used in write-side logical plans as the target of a write. Validation rules like PreprocessTableInsertion validate the write dataframe against the relation's schema. That's why the relation has a newWriteSupport method. Creating a relation from a Table should always work, even if the table isn't readable or isn't writable. Analysis can be done later to validate whether the plan that contains a relation can actually use the table. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226790252 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/InputStream.java --- @@ -17,14 +17,18 @@ package org.apache.spark.sql.sources.v2.reader.streaming; -import org.apache.spark.sql.sources.v2.reader.ReadSupport; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; /** - * A base interface for streaming read support. This is package private and is invisible to data - * sources. Data sources should implement concrete streaming read support interfaces: - * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + * An interface representing a readable data stream in a streaming query. It's responsible to manage + * the offsets of the streaming source in this streaming query. + * + * Data sources should implement concrete input stream interfaces: {@link MicroBatchInputStream} and + * {@link ContinuousInputStream}. */ -interface StreamingReadSupport extends ReadSupport { +@InterfaceStability.Evolving +public interface InputStream extends BaseStreamingSource { --- End diff -- `InputStream` conflicts with a well-known JVM class, [`java.io.InputStream`](https://docs.oracle.com/javase/9/docs/api/java/io/InputStream.html). I think this should be renamed to be more specific to a streaming table scan. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226789748 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java --- @@ -15,37 +15,43 @@ * limitations under the License. */ -package org.apache.spark.sql.sources.v2.reader; +package org.apache.spark.sql.sources.v2; import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.datasources.v2.NoopScanConfigBuilder; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; +import org.apache.spark.sql.types.StructType; /** - * An interface that defines how to load the data from data source for batch processing. + * An interface representing a logical structured data set of a data source. For example, the + * implementation can be a directory on the file system, or a table in the catalog, etc. * - * The execution engine will get an instance of this interface from a data source provider - * (e.g. {@link org.apache.spark.sql.sources.v2.BatchReadSupportProvider}) at the start of a batch - * query, then call {@link #newScanConfigBuilder()} and create an instance of {@link ScanConfig}. - * The {@link ScanConfigBuilder} can apply operator pushdown and keep the pushdown result in - * {@link ScanConfig}. The {@link ScanConfig} will be used to create input partitions and reader - * factory to scan data from the data source with a Spark job. + * This interface can mixin the following interfaces to support different operations: + * + * {@link SupportsBatchRead}: this table can be read in batch queries. + * {@link SupportsMicroBatchRead}: this table can be read in streaming queries with + * micro-batch trigger. + * {@link SupportsContinuousRead}: this table can be read in streaming queries with + * continuous trigger. + * */ @InterfaceStability.Evolving -public interface BatchReadSupport extends ReadSupport { +public interface Table { + + /** + * Returns the schema of this table. + */ + StructType schema(); /** * Returns a builder of {@link ScanConfig}. Spark will call this method and create a * {@link ScanConfig} for each data scanning job. * * The builder can take some query specific information to do operators pushdown, and keep these * information in the created {@link ScanConfig}. - * - * This is the first step of the data scan. All other methods in {@link BatchReadSupport} needs - * to take {@link ScanConfig} as an input. - */ - ScanConfigBuilder newScanConfigBuilder(); - - /** - * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}. */ - PartitionReaderFactory createReaderFactory(ScanConfig config); + default ScanConfigBuilder newScanConfigBuilder(DataSourceOptions options) { --- End diff -- I think it should be clear that these are scan-specific options. Maybe add some documentation with an example of something that would be passed to configure a scan, like a target split size for combining. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226789610 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.BatchScan; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * A mix-in interface for {@link Table}. Table implementations can mixin this interface to + * provide data reading ability for batch processing. + */ +@InterfaceStability.Evolving +public interface SupportsBatchRead extends Table { + + /** + * Creates a {@link BatchScan} instance with a {@link ScanConfig} and user-specified options. + * + * @param config a {@link ScanConfig} which may contains operator pushdown information. + * @param options the user-specified options, which is same as the one used to create the + *{@link ScanConfigBuilder} that built the given {@link ScanConfig}. + */ + BatchScan createBatchScan(ScanConfig config, DataSourceOptions options); --- End diff -- Is there a benefit to having both `ScanConfig` and `BatchScan` objects? Why not have `ScanConfigBuilder` return a `BatchScan` directly by calling `buildBatch`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226785695 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchScan.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.SupportsBatchRead; +import org.apache.spark.sql.sources.v2.Table; + +/** + * A {@link Scan} for batch queries. + * + * The execution engine will get an instance of {@link Table} first, then call + * {@link Table#newScanConfigBuilder(DataSourceOptions)} and create an instance of + * {@link ScanConfig}. The {@link ScanConfigBuilder} can apply operator pushdown and keep the + * pushdown result in {@link ScanConfig}. Then + * {@link SupportsBatchRead#createBatchScan(ScanConfig, DataSourceOptions)} will be called to create + * a {@link BatchScan} instance, which will be used to create input partitions and reader factory to + * scan data from the data source with a Spark job. + */ +@InterfaceStability.Evolving +public interface BatchScan extends Scan { + + /** + * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}. + */ + PartitionReaderFactory createReaderFactory(); --- End diff -- Why are `BatchScan` and `PartitionReaderFactory` different interfaces? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226784919 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.BatchScan; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * A mix-in interface for {@link Table}. Table implementations can mixin this interface to + * provide data reading ability for batch processing. + */ +@InterfaceStability.Evolving +public interface SupportsBatchRead extends Table { + + /** + * Creates a {@link BatchScan} instance with a {@link ScanConfig} and user-specified options. + * + * @param config a {@link ScanConfig} which may contains operator pushdown information. + * @param options the user-specified options, which is same as the one used to create the + *{@link ScanConfigBuilder} that built the given {@link ScanConfig}. --- End diff -- I don't think that options should be passed twice. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226783272 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -106,85 +107,96 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister failOnDataLoss(caseInsensitiveParams)) } - /** - * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to read - * batches of Kafka data in a micro-batch streaming query. - */ - override def createMicroBatchReadSupport( - metadataPath: String, - options: DataSourceOptions): KafkaMicroBatchReadSupport = { - -val parameters = options.asMap().asScala.toMap -validateStreamOptions(parameters) -// Each running query should use its own group id. Otherwise, the query may be only assigned -// partial data since Kafka will assign partitions to multiple consumers having the same group -// id. Hence, we should generate a unique id for each query. -val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" - -val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } -val specifiedKafkaParams = - parameters -.keySet -.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) -.map { k => k.drop(6).toString -> parameters(k) } -.toMap - -val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams, - STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) - -val kafkaOffsetReader = new KafkaOffsetReader( - strategy(caseInsensitiveParams), - kafkaParamsForDriver(specifiedKafkaParams), - parameters, - driverGroupIdPrefix = s"$uniqueGroupId-driver") - -new KafkaMicroBatchReadSupport( - kafkaOffsetReader, - kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), - options, - metadataPath, - startingStreamOffsets, - failOnDataLoss(caseInsensitiveParams)) + override def getTable(options: DataSourceOptions): KafkaTable.type = { +KafkaTable } - /** - * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read - * Kafka data in a continuous streaming query. - */ - override def createContinuousReadSupport( - metadataPath: String, - options: DataSourceOptions): KafkaContinuousReadSupport = { -val parameters = options.asMap().asScala.toMap -validateStreamOptions(parameters) -// Each running query should use its own group id. Otherwise, the query may be only assigned -// partial data since Kafka will assign partitions to multiple consumers having the same group -// id. Hence, we should generate a unique id for each query. -val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" - -val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } -val specifiedKafkaParams = - parameters -.keySet -.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) -.map { k => k.drop(6).toString -> parameters(k) } -.toMap + object KafkaTable extends Table --- End diff -- Why is `KafkaTable` an object, not a class? This doesn't seem to fit an abstraction. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/22547 @cloud-fan, is there a design doc that outlines these changes and the new API structure? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22547#discussion_r226782371 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala --- @@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType * scenarios, where some offsets after the specified initial ones can't be * properly read. */ -class KafkaContinuousReadSupport( +class KafkaContinuousInputStream( --- End diff -- Is it possible to break this change into multiple PRs for batch, microbatch, and continuous? It's really large and it would be nice if we could get the changes in incrementally. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r226780862 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -169,15 +174,16 @@ object DataSourceV2Relation { options: Map[String, String], tableIdent: Option[TableIdentifier] = None, userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = { -val reader = source.createReader(options, userSpecifiedSchema) +val readSupport = source.createReadSupport(options, userSpecifiedSchema) --- End diff -- In the long term, I don't think that sources should use the reader to get a schema. This is a temporary hack until we have catalog support, which is really where schemas should come from. The way this works in our version (which is substantially ahead of upstream Spark, unfortunately), is that a Table is loaded by a Catalog. The schema reported by that table is used to validate writes. That way, the table can report it's schema and Spark knows that data written must be compatible with that schema, but the source isn't required to be readable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22501: [SPARK-25492][TEST] Refactor WideSchemaBenchmark ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22501#discussion_r226765772 --- Diff: sql/core/benchmarks/WideSchemaBenchmark-results.txt --- @@ -1,117 +1,145 @@ -Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 -Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz + +parsing large select expressions + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 +Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz parsing large select:Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative -1 select expressions 2 /4 0.0 2050147.0 1.0X -100 select expressions 6 /7 0.0 6123412.0 0.3X -2500 select expressions135 / 141 0.0 134623148.0 0.0X +1 select expressions 2 /4 0.0 1934953.0 1.0X +100 select expressions 4 /5 0.0 3659399.0 0.5X +2500 select expressions 68 / 76 0.0 68278937.0 0.0X -Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 -Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz + +many column field read and write + + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 +Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz many column field r/w: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative -1 cols x 10 rows (read in-mem) 16 / 18 6.3 158.6 1.0X -1 cols x 10 rows (exec in-mem) 17 / 19 6.0 166.7 1.0X -1 cols x 10 rows (read parquet) 24 / 26 4.3 235.1 0.7X -1 cols x 10 rows (write parquet)81 / 85 1.2 811.3 0.2X -100 cols x 1000 rows (read in-mem) 17 / 19 6.0 166.2 1.0X -100 cols x 1000 rows (exec in-mem) 25 / 27 4.0 249.2 0.6X -100 cols x 1000 rows (read parquet) 23 / 25 4.4 226.0 0.7X -100 cols x 1000 rows (write parquet)83 / 87 1.2 831.0 0.2X -2500 cols x 40 rows (read in-mem) 132 / 137 0.8 1322.9 0.1X -2500 cols x 40 rows (exec in-mem) 326 / 330 0.3 3260.6 0.0X -2500 cols x 40 rows (read parquet) 831 / 839 0.1 8305.8 0.0X -2500 cols x 40 rows (write parquet)237 / 245 0.4 2372.6 0.1X - -Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 -Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz +1 cols x 10 rows (read in-mem) 22 / 25 4.6 219.4 1.0X +1 cols x 10 rows (exec in-mem) 22 / 28 4.5 223.8 1.0X +1 cols x 10 rows (read parquet) 45 / 49 2.2 449.6 0.5X +1 cols x 10 rows (write parquet) 204 / 223 0.5 2044.4 0.1X --- End diff -- @dongjoon-hyun, so you are saying that it doesn't appear that there is a performance regression, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/22547 @cloud-fan, sorry to look at this so late, I was out on vacation for a little while. Is this about ready for review? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22573: [SPARK-25558][SQL] Pushdown predicates for nested fields...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/22573 @dongjoon-hyun, Iceberg schema evolution is based on the field IDs, not on names. The current table schema's names are the runtime names for columns in that table, and all reads happen by first translating those names to IDs and projecting the IDs from the data files. That way, renames can never cause you to get incorrect data. You're mostly right that Spark has a problem with schema evolution for HadoopFS tables. That wouldn't affect my suggestion here, though. If you're filtering or projecting field `m.n`, then Spark currently handles that by matching columns by name. If you're matching by name, then `m.n` can't change across versions, or at least you can always project `m.n` from the data (in the case of Avro). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22573: [SPARK-25558][SQL] Pushdown predicates for nested fields...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/22573 The approach we've taken in Iceberg is to allow `.` in names by using an index in the top-level schema. The full path of every leaf in the schema is produced and added to a map from the full field name to the field's ID. The reason why we do this is to avoid problem areas: * Parsing the name using `.` as a delimiter * Traversing the schema structure For example, the schema `0: a struct<2: x int, 3: y int>, 1: a.z int` produces this index: `Map("a" -> 0, "a.x" -> 2, "a.y" -> 3, "a.z" -> 1)`. Binding filters like `a.x > 3` or `a.z < 5` is done using the index instead of parsing the field name and traversing, so you get the right result without needing to decide whether "a.x" is nested or if it is the actual name. So the lookup is quick and correctly produces `id(2) > 3` and `id(1) < 5`. This is also used for projection because users want to be able to select nested columns by name using dotted field names. The only drawback to this approach is that you can't have duplicates in the index: each full field name must be unique. In the example above, the top-level `a.z` field could not be named `a.x` or else it would collide with `x` nested in `a`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22413: [SPARK-25425][SQL] Extra options should override session...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/22413 Thanks @MaxGekk, sorry for the original omission! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org