[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19448 I have a lot of sympathy for the argument that infrastructure software shouldn't have too many backports and that those should be generally bug fixes. But, if I were working on a Spark distribution at a vendor, this is something I would definitely include because it's such a useful feature. I think that by not backporting this, we're just pushing that work downstream. Plus, the risk to adding this is low: the main behavior change is that users can specify a previously-banned committer for Parquet writes. Is it a bug fix? Probably not. But it fixes a big blocker. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144388430 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- I think once per write operation is fine. It's not like it is once per file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19448 Still +1 from me as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144331909 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- I think I'd prefer the warn & continue option. It does little good to fail so late in a job, when the caller has already indicated that they want to use a different committer. Let them write the data out since this isn't a correctness issue, and they can add a summary file later if they want. Basically, there's less annoyance and interruption by not writing a summary file than by failing a job and forcing the user to re-run near the end. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r144097061 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link DataWriter}. + * + * The writing procedure is: + * 1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create the data writer, and write the data of the partition with this + * writer. If all the data are written successfully, call {@link DataWriter#commit()}. If + * exception happens during the writing, call {@link DataWriter#abort()}. + * 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If + * some writers are aborted, or the job failed with an unknown reason, call + * {@link #abort(WriterCommitMessage[])}. + * + * Spark won't retry failed writing jobs, users should do it manually in their Spark applications if + * they want to retry. + * + * Please refer to the document of commit/abort methods for detailed specifications. + * + * Note that, this interface provides a protocol between Spark and data sources for transactional + * data writing, but the transaction here is Spark-level transaction, which may not be the + * underlying storage transaction. For example, Spark successfully writes data to a Cassandra data + * source, but Cassandra may need some more time to reach consistency at storage level. + */ +@InterfaceStability.Evolving +public interface DataSourceV2Writer { + + /** + * Creates a writer factory which will be serialized and sent to executors. + */ + DataWriterFactory createWriterFactory(); + + /** + * Commits this writing job with a list of commit messages. The commit messages are collected from + * successful data writers and are produced by {@link DataWriter#commit()}. If this method + * fails(throw exception), this writing job is considered to be failed, and + * {@link #abort(WriterCommitMessage[])} will be called. The written data should only be visible + * to data source readers if this method successes. + * + * Note that, one partition may have multiple committed data writers because of speculative tasks. + * Spark will pick the first successful one and get its commit message. Implementations should be + * aware of this and handle it correctly, e.g., have a mechanism to make sure only one data writer + * can commit successfully, or have a way to clean up the data of already-committed writers. + */ + void commit(WriterCommitMessage[] messages); + + /** + * Aborts this writing job because some data writers are failed to write the records and aborted, + * or the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} + * fails. If this method fails(throw exception), the underlying data source may have garbage that + * need to be cleaned manually, but these garbage should not be visible to data source readers. + * + * Unless the abortion is triggered by the failure of commit, the given messages should h
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144088592 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- `AnalysisException`? Shouldn't this be `SparkException`? By the time this runs, Spark has already analyzed, optimized, and planned the job. Doesn't seem like failing analysis is appropriate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19424: [SPARK-22197][SQL] push down operators to data source be...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19424 What are the guarantees made by the previous batches in the optimizer? The work done by `FilterAndProject` seems redundant to me because the optimizer should already push filters below projection. Is that not guaranteed by the time this runs? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19269 > There is no restriction to let the output of data writers be visible to other writers, so it's possible to launch a write task just for cleaning up the data of other writers. Agreed. Other writers can possibly see and change the data. I'm not sure if you intend this to cover some aspect of abort, but this wouldn't necessarily help with cleanup if you are. In my use case, a writer may produce a file in any partition in S3 depending on the data it writes. To clean up after another writer, I'd have to go through all of the input data to get partitions, then go list those partition locations for uncommitted files. It's not really practical to clean up this way so I don't think it could be a substitute for passing commit messages to job abort. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19269 > The only contract Spark needs is: data written/committed by tasks should not be visible to data source readers until the job-level commitment. But they can be visible to others like other writing tasks, so it's possible for data sources to implement "abort the output of the other writer". I'm not following what you mean here. > making DataSourceV2Writer.abort take commit messages is still a "best-effort" to clean up the data Agreed. We should state something about this in the abort job docs: "Commit messages passed to abort are the messages for all commits that succeeded and sent a commit message to the driver. It is possible, though unlikely, for an executor to successfully commit data to a data source, but fail before sending the commit message to the driver." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19424: [SPARK-22197][SQL] push down operators to data so...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19424#discussion_r143597547 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, Expression, PredicateHelper} +import org.apache.spark.sql.catalyst.optimizer.{PushDownPredicate, RemoveRedundantProject} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources +import org.apache.spark.sql.sources.v2.reader._ + +/** + * Pushes down various operators to the underlying data source for better performance. Operators are + * being pushed down with a specific order. As an example, given a LIMIT has a FILTER child, you + * can't push down LIMIT if FILTER is not completely pushed down. When both are pushed down, the + * data source should execute FILTER before LIMIT. And required columns are calculated at the end, + * because when more operators are pushed down, we may need less columns at Spark side. + */ +object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHelper { + override def apply(plan: LogicalPlan): LogicalPlan = { +// make sure filters are at very bottom. +val prepared = PushDownPredicate(plan) +val afterPushDown = prepared transformUp { + case Filter(condition, r @ DataSourceV2Relation(_, reader)) => +val (candidates, containingNonDeterministic) = + splitConjunctivePredicates(condition).span(_.deterministic) + +val stayUpFilters: Seq[Expression] = reader match { + case r: SupportsPushDownCatalystFilters => +r.pushCatalystFilters(candidates.toArray) + + case r: SupportsPushDownFilters => +// A map from original Catalyst expressions to corresponding translated data source +// filters. If a predicate is not in this map, it means it cannot be pushed down. +val translatedMap: Map[Expression, sources.Filter] = candidates.flatMap { p => + DataSourceStrategy.translateFilter(p).map(f => p -> f) +}.toMap + +// Catalyst predicate expressions that cannot be converted to data source filters. +val nonConvertiblePredicates = candidates.filterNot(translatedMap.contains) + +// Data source filters that cannot be pushed down. An unhandled filter means +// the data source cannot guarantee the rows returned can pass the filter. +// As a result we must return it so Spark can plan an extra filter operator. +val unhandledFilters = r.pushFilters(translatedMap.values.toArray).toSet +val unhandledPredicates = translatedMap.filter { case (_, f) => + unhandledFilters.contains(f) +}.keys + +nonConvertiblePredicates ++ unhandledPredicates + + case _ => candidates +} + +val filterCondition = (stayUpFilters ++ containingNonDeterministic).reduceLeftOption(And) +filterCondition.map(Filter(_, r)).getOrElse(r) + + // TODO: add more push down rules. +} + +// TODO: nested fields pruning +def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: Seq[Attribute]): Unit = { + plan match { +case Project(projectList, child) => + val required = projectList.filter(requiredByParent.contains).flatMap(_.references) + pushDownRequiredColumns(child, required) + +case Fil
[GitHub] spark pull request #19424: [SPARK-22197][SQL] push down operators to data so...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19424#discussion_r143593605 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, Expression, PredicateHelper} +import org.apache.spark.sql.catalyst.optimizer.{PushDownPredicate, RemoveRedundantProject} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources +import org.apache.spark.sql.sources.v2.reader._ + +/** + * Pushes down various operators to the underlying data source for better performance. Operators are + * being pushed down with a specific order. As an example, given a LIMIT has a FILTER child, you + * can't push down LIMIT if FILTER is not completely pushed down. When both are pushed down, the + * data source should execute FILTER before LIMIT. And required columns are calculated at the end, + * because when more operators are pushed down, we may need less columns at Spark side. + */ +object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHelper { + override def apply(plan: LogicalPlan): LogicalPlan = { +// make sure filters are at very bottom. +val prepared = PushDownPredicate(plan) +val afterPushDown = prepared transformUp { + case Filter(condition, r @ DataSourceV2Relation(_, reader)) => +val (candidates, containingNonDeterministic) = + splitConjunctivePredicates(condition).span(_.deterministic) --- End diff -- It isn't immediately clear why you would use `span` here instead of `partition`. I think it is because `span` will produce all deterministic predicates that would be run before the first non-deterministic predicate in an in-order traversal of teh condition, right? If so, then a comment would be really useful to make this clear. I'd also like to see a comment about why deterministic predicates "after" the first non-deterministic predicate shouldn't be pushed down. An example would really help, too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19424: [SPARK-22197][SQL] push down operators to data so...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19424#discussion_r143597669 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, Expression, PredicateHelper} +import org.apache.spark.sql.catalyst.optimizer.{PushDownPredicate, RemoveRedundantProject} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources +import org.apache.spark.sql.sources.v2.reader._ + +/** + * Pushes down various operators to the underlying data source for better performance. Operators are + * being pushed down with a specific order. As an example, given a LIMIT has a FILTER child, you + * can't push down LIMIT if FILTER is not completely pushed down. When both are pushed down, the + * data source should execute FILTER before LIMIT. And required columns are calculated at the end, + * because when more operators are pushed down, we may need less columns at Spark side. + */ +object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHelper { + override def apply(plan: LogicalPlan): LogicalPlan = { +// make sure filters are at very bottom. +val prepared = PushDownPredicate(plan) +val afterPushDown = prepared transformUp { + case Filter(condition, r @ DataSourceV2Relation(_, reader)) => +val (candidates, containingNonDeterministic) = + splitConjunctivePredicates(condition).span(_.deterministic) + +val stayUpFilters: Seq[Expression] = reader match { + case r: SupportsPushDownCatalystFilters => +r.pushCatalystFilters(candidates.toArray) + + case r: SupportsPushDownFilters => +// A map from original Catalyst expressions to corresponding translated data source +// filters. If a predicate is not in this map, it means it cannot be pushed down. +val translatedMap: Map[Expression, sources.Filter] = candidates.flatMap { p => + DataSourceStrategy.translateFilter(p).map(f => p -> f) +}.toMap + +// Catalyst predicate expressions that cannot be converted to data source filters. +val nonConvertiblePredicates = candidates.filterNot(translatedMap.contains) + +// Data source filters that cannot be pushed down. An unhandled filter means +// the data source cannot guarantee the rows returned can pass the filter. +// As a result we must return it so Spark can plan an extra filter operator. +val unhandledFilters = r.pushFilters(translatedMap.values.toArray).toSet +val unhandledPredicates = translatedMap.filter { case (_, f) => + unhandledFilters.contains(f) +}.keys + +nonConvertiblePredicates ++ unhandledPredicates + + case _ => candidates +} + +val filterCondition = (stayUpFilters ++ containingNonDeterministic).reduceLeftOption(And) +filterCondition.map(Filter(_, r)).getOrElse(r) + + // TODO: add more push down rules. +} + +// TODO: nested fields pruning +def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: Seq[Attribute]): Unit = { + plan match { +case Project(projectList, child) => + val required = projectList.filter(requiredByParent.contains).flatMap(_.references) + pushDownRequiredColumns(child, required) + +case Fil
[GitHub] spark pull request #19424: [SPARK-22197][SQL] push down operators to data so...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19424#discussion_r143591559 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, Expression, PredicateHelper} +import org.apache.spark.sql.catalyst.optimizer.{PushDownPredicate, RemoveRedundantProject} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources +import org.apache.spark.sql.sources.v2.reader._ + +/** + * Pushes down various operators to the underlying data source for better performance. Operators are + * being pushed down with a specific order. As an example, given a LIMIT has a FILTER child, you + * can't push down LIMIT if FILTER is not completely pushed down. When both are pushed down, the + * data source should execute FILTER before LIMIT. And required columns are calculated at the end, + * because when more operators are pushed down, we may need less columns at Spark side. + */ +object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHelper { + override def apply(plan: LogicalPlan): LogicalPlan = { +// make sure filters are at very bottom. +val prepared = PushDownPredicate(plan) --- End diff -- Why apply this rule one more time? Is there reason to suspect that predicates won't already be pushed and that one more run would be worth it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19394 Thanks for reviewing, @gatorsmile! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r143524057 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleWritableDataSource.java --- @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.ReadSupport; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.sources.v2.reader.DataReader; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.writer.*; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructType; + +public class JavaSimpleWritableDataSource implements DataSourceV2, ReadSupport, WriteSupport { + private StructType schema = new StructType().add("i", "long").add("j", "long"); + + class Reader implements DataSourceV2Reader { +private String path; + +Reader(String path) { + this.path = path; +} + +@Override +public StructType readSchema() { + return schema; +} + +@Override +public List> createReadTasks() { + return java.util.Arrays.asList(new JavaSimpleCSVReadTask(path)); +} + } + + static class JavaSimpleCSVReadTask implements ReadTask, DataReader { +private String path; +private volatile Iterator lines; +private volatile String currentLine; + +JavaSimpleCSVReadTask(Iterator lines) { + this.lines = lines; +} + +JavaSimpleCSVReadTask(String path) { + this.path = path; +} + +@Override +public DataReader createReader() { + assert path != null; + try { +if (Files.exists(Paths.get(path))) { + return new JavaSimpleCSVReadTask(Files.readAllLines(Paths.get(path)).iterator()); +} else { + return new JavaSimpleCSVReadTask(Collections.emptyIterator()); +} + } catch (IOException e) { +throw new RuntimeException(e); --- End diff -- Does Spark have a `RuntimeIOException`? I typically like to use those so that the IOException can still be caught and handled by code that chooses to. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19394#discussion_r143517522 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -274,19 +274,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ val byteArrayRdd = getByteArrayRdd() val results = ArrayBuffer[InternalRow]() -byteArrayRdd.collect().foreach { bytes => - decodeUnsafeRows(bytes).foreach(results.+=) +byteArrayRdd.collect().foreach { rdd => + decodeUnsafeRows(rdd._2).foreach(results.+=) } results.toArray } + private[spark] def executeCollectIterator(): (Long, Iterator[InternalRow]) = { +val countsAndBytes = getByteArrayRdd().collect() +val total = countsAndBytes.map(_._1).sum +val rows = countsAndBytes.iterator.flatMap(rdd => decodeUnsafeRows(rdd._2)) +(total, rows) + } + /** * Runs this query returning the result as an iterator of InternalRow. * * @note Triggers multiple jobs (one for each partition). */ def executeToIterator(): Iterator[InternalRow] = { -getByteArrayRdd().toLocalIterator.flatMap(decodeUnsafeRows) +getByteArrayRdd().toLocalIterator.flatMap(rdd => decodeUnsafeRows(rdd._2)) --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19394#discussion_r143517490 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -274,19 +274,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ val byteArrayRdd = getByteArrayRdd() val results = ArrayBuffer[InternalRow]() -byteArrayRdd.collect().foreach { bytes => - decodeUnsafeRows(bytes).foreach(results.+=) +byteArrayRdd.collect().foreach { rdd => + decodeUnsafeRows(rdd._2).foreach(results.+=) } results.toArray } + private[spark] def executeCollectIterator(): (Long, Iterator[InternalRow]) = { +val countsAndBytes = getByteArrayRdd().collect() +val total = countsAndBytes.map(_._1).sum +val rows = countsAndBytes.iterator.flatMap(rdd => decodeUnsafeRows(rdd._2)) --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19394#discussion_r143317742 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala --- @@ -58,7 +58,7 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext { withSQLConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION.key -> "1") { // If we only sample one point, the range boundaries will be pretty bad and the // chi-sq value would be very high. -assert(computeChiSquareTest() > 1000) +assert(computeChiSquareTest() > 300) --- End diff -- Updating the SparkPlan code to avoid creating unnecessary RDDs fixed this test because the same random seed is used. But, I think we should keep this change so we don't have to track down this problem again in the future. This bound is perfectly safe because the balanced chi-sq value is 10, while the unbalanced is much larger. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19394#discussion_r143317686 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -280,13 +280,20 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ results.toArray } + private[spark] def executeCollectIterator(): (Long, Iterator[InternalRow]) = { +val countsAndBytes = getByteArrayRdd().collect() +val total = countsAndBytes.map(_._1).sum +val rows = countsAndBytes.iterator.map(_._2).flatMap(decodeUnsafeRows) +(total, rows) + } + /** * Runs this query returning the result as an iterator of InternalRow. * * @note Triggers multiple jobs (one for each partition). */ def executeToIterator(): Iterator[InternalRow] = { -getByteArrayRdd().toLocalIterator.flatMap(decodeUnsafeRows) +getByteArrayRdd().toLocalIterator.map(_._2).flatMap(decodeUnsafeRows) --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19394#discussion_r143317518 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -280,13 +280,20 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ results.toArray } + private[spark] def executeCollectIterator(): (Long, Iterator[InternalRow]) = { +val countsAndBytes = getByteArrayRdd().collect() +val total = countsAndBytes.map(_._1).sum +val rows = countsAndBytes.iterator.map(_._2).flatMap(decodeUnsafeRows) +(total, rows) + } + /** * Runs this query returning the result as an iterator of InternalRow. * * @note Triggers multiple jobs (one for each partition). */ def executeToIterator(): Iterator[InternalRow] = { -getByteArrayRdd().toLocalIterator.flatMap(decodeUnsafeRows) +getByteArrayRdd().toLocalIterator.map(_._2).flatMap(decodeUnsafeRows) --- End diff -- Good point. I guess from the tests that this will create a new RDD, which isn't what we want. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19448 +1 I completely agree that using a ParquetOutputCommitter should be optional. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19394 Anyone have a clue what the python error could be? It doesn't look related. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19394#discussion_r143286130 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala --- @@ -73,25 +73,37 @@ case class BroadcastExchangeExec( try { val beforeCollect = System.nanoTime() // Note that we use .executeCollect() because we don't want to convert data to Scala types - val input: Array[InternalRow] = child.executeCollect() - if (input.length >= 51200) { + val (numRows, input) = child.executeCollectIterator() + if (numRows >= 51200) { throw new SparkException( - s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows") + s"Cannot broadcast the table with more than 512 millions rows: $numRows rows") } + val beforeBuild = System.nanoTime() longMetric("collectTime") += (beforeBuild - beforeCollect) / 100 - val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum + + // Construct the relation. + val relation = mode.transform(input, Some(numRows)) + + val dataSize = relation match { +case map: HashedRelation => + map.estimatedSize +case arr: Array[InternalRow] => + arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum +case _ => + numRows * 512 // guess: each row is about 512 bytes --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19394#discussion_r143286104 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala --- @@ -73,25 +73,37 @@ case class BroadcastExchangeExec( try { val beforeCollect = System.nanoTime() // Note that we use .executeCollect() because we don't want to convert data to Scala types --- End diff -- Updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19394#discussion_r143285737 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala --- @@ -58,7 +58,7 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext { withSQLConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION.key -> "1") { // If we only sample one point, the range boundaries will be pretty bad and the // chi-sq value would be very high. -assert(computeChiSquareTest() > 1000) +assert(computeChiSquareTest() > 300) --- End diff -- @rxin, the difference that is causing this to fail is that `rdd.id` is 15 instead of 14 because of the change to `getByteArrayRdd`. That id is used to seed the random generator and the chi-sq value isn't high enough. The previous value must have been unusually high. With so few data points, this can vary quite a bit so I think changing the bound to 300 is a good fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19394#discussion_r143228054 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala --- @@ -73,25 +73,37 @@ case class BroadcastExchangeExec( try { val beforeCollect = System.nanoTime() // Note that we use .executeCollect() because we don't want to convert data to Scala types - val input: Array[InternalRow] = child.executeCollect() - if (input.length >= 51200) { + val (numRows, input) = child.executeCollectIterator() + if (numRows >= 51200) { throw new SparkException( - s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows") + s"Cannot broadcast the table with more than 512 millions rows: $numRows rows") } + val beforeBuild = System.nanoTime() longMetric("collectTime") += (beforeBuild - beforeCollect) / 100 - val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum + + // Construct the relation. + val relation = mode.transform(input, Some(numRows)) + + val dataSize = relation match { +case map: HashedRelation => + map.estimatedSize --- End diff -- True, but I think using the accurate size is better. Maybe we should revisit the 8gb maximum instead of using a size that isn't correct. What's the reason for the 8gb max? In master, building a broadcast table just under the 8gb limit would require 8gb for the rows (what we actually count here), plus the size of those rows compressed, which is probably at least 1gb. So we're talking about a pretty huge broadcast table to hit the case where this check will cause failures that didn't happen before. Not that it couldn't happen, but I've never seen this in practice and we have some huge jobs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19394#discussion_r143226823 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala --- @@ -73,25 +73,37 @@ case class BroadcastExchangeExec( try { val beforeCollect = System.nanoTime() // Note that we use .executeCollect() because we don't want to convert data to Scala types - val input: Array[InternalRow] = child.executeCollect() - if (input.length >= 51200) { + val (numRows, input) = child.executeCollectIterator() + if (numRows >= 51200) { throw new SparkException( - s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows") + s"Cannot broadcast the table with more than 512 millions rows: $numRows rows") } + val beforeBuild = System.nanoTime() longMetric("collectTime") += (beforeBuild - beforeCollect) / 100 - val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum + + // Construct the relation. + val relation = mode.transform(input, Some(numRows)) + + val dataSize = relation match { +case map: HashedRelation => + map.estimatedSize +case arr: Array[InternalRow] => --- End diff -- Yeah, both cases are used. The only one that won't match is `case _ => ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19394#discussion_r143226714 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala --- @@ -73,25 +73,37 @@ case class BroadcastExchangeExec( try { val beforeCollect = System.nanoTime() // Note that we use .executeCollect() because we don't want to convert data to Scala types - val input: Array[InternalRow] = child.executeCollect() - if (input.length >= 51200) { + val (numRows, input) = child.executeCollectIterator() + if (numRows >= 51200) { throw new SparkException( - s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows") + s"Cannot broadcast the table with more than 512 millions rows: $numRows rows") } + val beforeBuild = System.nanoTime() longMetric("collectTime") += (beforeBuild - beforeCollect) / 100 - val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum + + // Construct the relation. + val relation = mode.transform(input, Some(numRows)) + + val dataSize = relation match { +case map: HashedRelation => + map.estimatedSize +case arr: Array[InternalRow] => + arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum +case _ => + numRows * 512 // guess: each row is about 512 bytes --- End diff -- The won't cause a regression because all the broadcast modes return either Array or HashedRelation. This is just in case there is a path that returns something different in the future. Maybe it would be better to throw an exception here. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19394 Here's the error message: TestFailedException: 347.5272 was not greater than 1000 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19394 Yes, we've been running this in production for a few weeks now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...
GitHub user rdblue reopened a pull request: https://github.com/apache/spark/pull/19394 [SPARK-22170][SQL] Reduce memory consumption in broadcast joins. ## What changes were proposed in this pull request? This updates the broadcast join code path to lazily decompress pages and iterate through UnsafeRows to prevent all rows from being held in memory while the broadcast table is being built. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark broadcast-driver-memory Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19394.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19394 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19394 @rxin, any idea why this would fail ConfigBehaviorSuite? I don't think the failure is related because that test doesn't use a broadcast join. Should I rebase on master? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...
Github user rdblue closed the pull request at: https://github.com/apache/spark/pull/19394 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19394#discussion_r143060736 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -228,7 +228,7 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared testSparkPlanMetrics(df, 2, Map( 1L -> (("BroadcastHashJoin", Map( "number of output rows" -> 2L, -"avg hash probe (min, med, max)" -> "\n(1, 1, 1)" +"avg hash probe (min, med, max)" -> "\n(1.1, 1.1, 1.1)" --- End diff -- This test was incorrect. I debugged the metric and the value is consistently 1.1, with 9 lookups and 10 probes. 10/9 = 1.1... The reason why the test was passing before is that the metrics check was hitting a [race condition](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala#L165-L169) that caused the test to pass. If I change the values in master to 1.1, the tests still pass and demonstrate that the check isn't really happening. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19394#discussion_r142474766 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -280,13 +280,20 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ results.toArray } + private[spark] def executeCollectIterator(): (Long, Iterator[InternalRow]) = { +val countsAndBytes = getByteArrayRdd().collect() --- End diff -- I agree, but this is better than before. This only keeps all of the rows in memory compressed, and then streams through the compressed blocks. Before this patch, the rows are copied into a buffer per row while holding the compressed blocks, so it held the rows compressed and uncompressed at the same time. The uncompressed rows are what this fixes, we can follow up with something better to stream through blocks from executors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19394 Ideally, this would also use a TaskMemoryManager so the driver can spill results to disk instead of dying with an OOM. Is there any plan to add a memory manager for the driver? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19394: SPARK-22170: Reduce memory consumption in broadca...
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/19394 SPARK-22170: Reduce memory consumption in broadcast joins. This updates the broadcast join code path to lazily decompress pages and iterate through UnsafeRows to prevent all rows from being held in memory while the broadcast table is being built. ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark broadcast-driver-memory Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19394.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19394 commit d82ffca19c84589dfcb87a81996fb3ce2e31e91a Author: Ryan Blue Date: 2017-08-11T00:41:56Z SPARK-22170: Reduce memory consumption in broadcast joins. This updates the broadcast join code path to lazily decompress pages and iterate through UnsafeRows to prevent all rows from being held in memory while the broadcast table is being built. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r141679351 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link DataWriter}. + * + * The writing procedure is: + * 1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create the data writer, and write the data of the partition with this + * writer. If all the data are written successfully, call {@link DataWriter#commit()}. If + * exception happens during the writing, call {@link DataWriter#abort()}. This step may repeat + * several times as Spark will retry failed tasks. + * 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If + * some writers are aborted, or the job failed with an unknown reason, call {@link #abort()}. + * + * Spark may launch speculative tasks in step 2, so there may be more than one data writer working + * simultaneously for the same partition. Implementations should handle this case correctly, e.g., + * make sure only one data writer can commit successfully, or only admit one committed data writer + * and ignore/revert others at job level. + * + * Note that, data sources are responsible for providing transaction ability by implementing the + * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link DataWriter} correctly. + * The transaction here is Spark-level transaction, which may not be the underlying storage + * transaction. For example, Spark successfully write data to a Cassandra data source, but + * Cassandra may need some more time to reach consistency at storage level. + */ +@InterfaceStability.Evolving +public interface DataSourceV2Writer { + + /** + * Creates a writer factory which will be serialized and sent to executors. + */ + DataWriteFactory createWriterFactory(); + + /** + * Commits this writing job with a list of commit messages. The commit messages are collected from + * all data writers and are produced by {@link DataWriter#commit()}. + * + * Note that, one partition may have multiple committed data writers because of speculative tasks. + * Spark will pick the first successful one and get its commit message. Implementations should be + * aware of this and handle it correctly, e.g., have a mechanism to make sure only one data writer + * can commit successfully, or have a way to clean up the data of already committed writers. + */ + void commit(WriterCommitMessage[] messages); + + /** + * Aborts this writing job because some data writers are failed to write the records and aborted, + * or the Spark job failed with some unknown reasons. + * + * Note that some data writer may already be committed in this case, implementations should be + * aware of this and clean up the data. + */ + void abort(); --- End diff -- Sorry I missed that! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r141679290 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link WriteTask} that is returned by {@link #createWriteTask()}. + * + * The writing procedure is: + * 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create a data writer with the write task, and write the data of the + * partition with this writer. If all the data are written successfully, call + * {@link DataWriter#commit()}. If exception happens during the writing, call + * {@link DataWriter#abort()}. This step may repeat several times as Spark will retry failed + * tasks. + * 3. Wait until all the writers/partitions are finished, i.e., either committed or aborted. If + * all partitions are written successfully, call {@link #commit(WriterCommitMessage[])}. If + * some partitions failed and aborted, call {@link #abort()}. + * + * Note that, data sources are responsible for providing transaction ability by implementing the + * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link DataWriter} correctly. + * The transaction here is Spark-level transaction, which may not be the underlying storage + * transaction. For example, Spark successfully write data to a Cassandra data source, but + * Cassandra may need some more time to reach consistency at storage level. + */ +@InterfaceStability.Evolving +public interface DataSourceV2Writer { + + /** + * Creates a write task which will be serialized and sent to executors. For each partition of the + * input data(RDD), there will be one write task to write the records. + */ + WriteTask createWriteTask(); + + /** + * Commits this writing job with a list of commit messages. The commit messages are collected from + * all data writers for this writing job and are produced by {@link DataWriter#commit()}. This + * also means all the data are written successfully and all data writers are committed. + */ + void commit(WriterCommitMessage[] messages); + + /** + * Aborts this writing job because some data writers are failed to write the records and aborted. + */ + void abort(); --- End diff -- Can you explain the situation you're talking about a bit more? I think Spark should pass everything it can to the abort. I agree that the abort here should be flexible and a best-effort, but there are situations where it can't know how to roll back everything that tasks did without those commit messages. There may be cases where Spark can't guarantee whether a commit was complete or not, but where it can, it should pass those commit messages. Say you had 100 successful write tasks of 400, but the executor died before it returned the 100th message (but after committing data) and that executor failure hit the max and Spark cancelled the job. Even though Spark has only 99 commit messages, it should still pass the ones that it can. In the S3 case, these are the only way the driver kno
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r141460252 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link DataWriter}. + * + * The writing procedure is: + * 1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create the data writer, and write the data of the partition with this + * writer. If all the data are written successfully, call {@link DataWriter#commit()}. If + * exception happens during the writing, call {@link DataWriter#abort()}. This step may repeat + * several times as Spark will retry failed tasks. + * 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If + * some writers are aborted, or the job failed with an unknown reason, call {@link #abort()}. + * + * Spark may launch speculative tasks in step 2, so there may be more than one data writer working + * simultaneously for the same partition. Implementations should handle this case correctly, e.g., + * make sure only one data writer can commit successfully, or only admit one committed data writer + * and ignore/revert others at job level. + * + * Note that, data sources are responsible for providing transaction ability by implementing the + * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link DataWriter} correctly. + * The transaction here is Spark-level transaction, which may not be the underlying storage + * transaction. For example, Spark successfully write data to a Cassandra data source, but + * Cassandra may need some more time to reach consistency at storage level. + */ +@InterfaceStability.Evolving +public interface DataSourceV2Writer { + + /** + * Creates a writer factory which will be serialized and sent to executors. + */ + DataWriteFactory createWriterFactory(); + + /** + * Commits this writing job with a list of commit messages. The commit messages are collected from + * all data writers and are produced by {@link DataWriter#commit()}. + * + * Note that, one partition may have multiple committed data writers because of speculative tasks. + * Spark will pick the first successful one and get its commit message. Implementations should be + * aware of this and handle it correctly, e.g., have a mechanism to make sure only one data writer + * can commit successfully, or have a way to clean up the data of already committed writers. + */ + void commit(WriterCommitMessage[] messages); + + /** + * Aborts this writing job because some data writers are failed to write the records and aborted, + * or the Spark job failed with some unknown reasons. + * + * Note that some data writer may already be committed in this case, implementations should be + * aware of this and clean up the data. + */ + void abort(); --- End diff -- This still needs to be passed the WriterCommitMessages for committed tasks. (My previous
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r140389681 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link WriteTask} that is returned by {@link #createWriteTask()}. + * + * The writing procedure is: + * 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create a data writer with the write task, and write the data of the --- End diff -- Maybe you're right. If a source supports rollback for sequential tasks, then that might mean it has to support rollback for concurrent tasks. I was originally thinking of a case like JDBC without transactions. So an insert actually creates the rows and rolling back concurrent tasks would delete rows from the other task. But in that case, inserts are idempotent so it wouldn't matter. I'm not sure if there's a case where you can (or would) implement rolling back, but can't handle concurrency. Lets just leave it until someone has a use case for it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r140371372 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link WriteTask} that is returned by {@link #createWriteTask()}. + * + * The writing procedure is: + * 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create a data writer with the write task, and write the data of the --- End diff -- Maybe Spark should be able to disable speculative tasks at runtime instead of requiring data sources to rollback. I'd prefer that, but I don't think it is something that needs to change now. We can always add an interface if/when it is supported that allows data sources to communicate a lack of support for speculation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r140018805 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * A data writer returned by {@link WriteTask#createWriter(int, int, int)} and is responsible for + * writing data for an input RDD partition. + * + * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data + * source writers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source + * writers that mix in {@link SupportsWriteUnsafeRow}. + */ +@InterfaceStability.Evolving +public interface DataWriter { + + void write(T record); --- End diff -- I thought that might be the case. :) But, we can't count on not throwing exceptions when writing so it would be good to also document what happens when a writer does throw an exception here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r140017014 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Command.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils + +case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: LogicalPlan) + extends RunnableCommand { --- End diff -- It isn't just the UI though. I've seen cases where InsertIntoHadoopFsRelationCommand is run inside a CreateDatasourceTableAsSelectCommand, and their enforcement of write modes depend on one another. Our internal S3 committer updates partition information and handles partition-level conflicts, but that requires that the table exists before the write (to check what partitions already exist). When we moved table creation in the CTAS command, it broke the insert into command, when these two should be separate. While it's convenient to have a logical plan and a physical plan together, I think this ends up getting misused. That's why I'm advocating to change how we use RunnableCommand. To fix it, we should introduce a node with a command and logical plan, so we can optimize the entire plan and run the command at the right time. Clearly, this isn't a blocker for this PR, I just want to mention that I see this pattern causing a lot of problems every time we pull in a new Spark version. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139841435 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Command.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils + +case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: LogicalPlan) + extends RunnableCommand { --- End diff -- I know similar tasks do the same, but this should not implement `RunnableCommand`. I'm not sure the original intent for it, but I think `RunnableCommand` should be used for small tasks that are carried out on the driver, like DDL. Using `RunnableCommand` in cases like this where a job needs to run ends up effectively linking a logical plan into a physical plan, which has caused a few messy issues. For example, the problem where the Spark SQL tab doesn't show the entire operation and only shows the outer command without metrics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139839037 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * A data writer returned by {@link WriteTask#createWriter(int, int, int)} and is responsible for + * writing data for an input RDD partition. + * + * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data + * source writers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source + * writers that mix in {@link SupportsWriteUnsafeRow}. + */ +@InterfaceStability.Evolving +public interface DataWriter { + + void write(T record); --- End diff -- What happens if this throws an exception? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139838908 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link WriteTask} that is returned by {@link #createWriteTask()}. + * + * The writing procedure is: + * 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create a data writer with the write task, and write the data of the + * partition with this writer. If all the data are written successfully, call + * {@link DataWriter#commit()}. If exception happens during the writing, call + * {@link DataWriter#abort()}. This step may repeat several times as Spark will retry failed + * tasks. + * 3. Wait until all the writers/partitions are finished, i.e., either committed or aborted. If + * all partitions are written successfully, call {@link #commit(WriterCommitMessage[])}. If + * some partitions failed and aborted, call {@link #abort()}. + * + * Note that, data sources are responsible for providing transaction ability by implementing the + * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link DataWriter} correctly. + * The transaction here is Spark-level transaction, which may not be the underlying storage + * transaction. For example, Spark successfully write data to a Cassandra data source, but + * Cassandra may need some more time to reach consistency at storage level. + */ +@InterfaceStability.Evolving +public interface DataSourceV2Writer { + + /** + * Creates a write task which will be serialized and sent to executors. For each partition of the + * input data(RDD), there will be one write task to write the records. + */ + WriteTask createWriteTask(); + + /** + * Commits this writing job with a list of commit messages. The commit messages are collected from + * all data writers for this writing job and are produced by {@link DataWriter#commit()}. This + * also means all the data are written successfully and all data writers are committed. + */ + void commit(WriterCommitMessage[] messages); + + /** + * Aborts this writing job because some data writers are failed to write the records and aborted. + */ + void abort(); --- End diff -- Should this accept the commit messages for committed tasks, or will tasks be aborted? I'm thinking of the case where you're writing to S3. Say a data source writes all attempt files to the final locations, then removes any attempts that are aborted. If the job aborts with some tasks that have already committed, then either this should have the option of cleaning up those files (passed in the commit message) or all of the tasks should be individually aborted. I'd prefer to have this abort clean up successful/committed tasks because the logic may be different. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139838459 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link WriteTask} that is returned by {@link #createWriteTask()}. + * + * The writing procedure is: + * 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create a data writer with the write task, and write the data of the + * partition with this writer. If all the data are written successfully, call + * {@link DataWriter#commit()}. If exception happens during the writing, call + * {@link DataWriter#abort()}. This step may repeat several times as Spark will retry failed + * tasks. + * 3. Wait until all the writers/partitions are finished, i.e., either committed or aborted. If + * all partitions are written successfully, call {@link #commit(WriterCommitMessage[])}. If + * some partitions failed and aborted, call {@link #abort()}. + * + * Note that, data sources are responsible for providing transaction ability by implementing the + * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link DataWriter} correctly. + * The transaction here is Spark-level transaction, which may not be the underlying storage + * transaction. For example, Spark successfully write data to a Cassandra data source, but + * Cassandra may need some more time to reach consistency at storage level. + */ +@InterfaceStability.Evolving +public interface DataSourceV2Writer { + + /** + * Creates a write task which will be serialized and sent to executors. For each partition of the + * input data(RDD), there will be one write task to write the records. + */ + WriteTask createWriteTask(); + + /** + * Commits this writing job with a list of commit messages. The commit messages are collected from + * all data writers for this writing job and are produced by {@link DataWriter#commit()}. This + * also means all the data are written successfully and all data writers are committed. --- End diff -- I think this should state the guarantees when this method is called: * One and only one attempt for every task has committed successfully * Messages contains the commit message from every committed task attempt, which is no more than one per task. * All other attempts have been successfully aborted (is this a guarantee, or just that aborts have been attemtped?) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139836068 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link WriteTask} that is returned by {@link #createWriteTask()}. + * + * The writing procedure is: + * 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create a data writer with the write task, and write the data of the --- End diff -- How does this handle speculative execution? This description makes it sound like attempts are only run serially. I'd like to have an interface that signals support for concurrent tasks, for data sources that act like the direct committer and can't handle speculation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139835603 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link WriteTask} that is returned by {@link #createWriteTask()}. + * + * The writing procedure is: + * 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create a data writer with the write task, and write the data of the + * partition with this writer. If all the data are written successfully, call + * {@link DataWriter#commit()}. If exception happens during the writing, call + * {@link DataWriter#abort()}. This step may repeat several times as Spark will retry failed + * tasks. + * 3. Wait until all the writers/partitions are finished, i.e., either committed or aborted. If + * all partitions are written successfully, call {@link #commit(WriterCommitMessage[])}. If + * some partitions failed and aborted, call {@link #abort()}. + * + * Note that, data sources are responsible for providing transaction ability by implementing the + * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link DataWriter} correctly. + * The transaction here is Spark-level transaction, which may not be the underlying storage + * transaction. For example, Spark successfully write data to a Cassandra data source, but + * Cassandra may need some more time to reach consistency at storage level. + */ +@InterfaceStability.Evolving +public interface DataSourceV2Writer { + + /** + * Creates a write task which will be serialized and sent to executors. For each partition of the + * input data(RDD), there will be one write task to write the records. + */ + WriteTask createWriteTask(); --- End diff -- I think it's confusing to have only one write "task" that is serialized and used everywhere. It is implicitly copied by the serialization into multiple distinct tasks. Is there a better name for it? Maybe call the `DataWriter` the `WriteTask` and serialize something with a better name? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139834571 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java --- @@ -30,9 +30,8 @@ /** * Creates a {@link DataSourceV2Reader} to scan the data from this data source. * - * @param options the options for this data source reader, which is an immutable case-insensitive - *string-to-string map. - * @return a reader that implements the actual read logic. + * @param options the options for the returned data source reader, which is an immutable + *case-insensitive string-to-string map. --- End diff -- It would make this much easier to review if changes to the read path were taken out and committed in a follow-up to #19136. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139832973 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link WriteTask} that is returned by {@link #createWriteTask()}. + * + * The writing procedure is: + * 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create a data writer with the write task, and write the data of the + * partition with this writer. If all the data are written successfully, call + * {@link DataWriter#commit()}. If exception happens during the writing, call + * {@link DataWriter#abort()}. This step may repeat several times as Spark will retry failed + * tasks. + * 3. Wait until all the writers/partitions are finished, i.e., either committed or aborted. If + * all partitions are written successfully, call {@link #commit(WriterCommitMessage[])}. If + * some partitions failed and aborted, call {@link #abort()}. --- End diff -- The main reason why I wanted a separate SPIP for the write path was this point in the doc: > Ideally partitioning/bucketing concept should not be exposed in the Data Source API V2, because they are just techniques for data skipping and pre-partitioning. However, these 2 concepts are already widely used in Spark, e.g. DataFrameWriter.partitionBy and DDL syntax like ADD PARTITION. To be consistent, we need to add partitioning/bucketing to Data Source V2, so that the implementations can be able to specify partitioning/bucketing for read/write. There's a lot in there that's worth thinking about and possibly changing: 1. Ideally, the DataSourceV2 API wouldn't support bucketing/partitioning 2. The current DataFrameWriter API is what we should continue to support 3. Implementations should supply bucketing and partitioning for writes because of 2 **Bucketing/partitioning**: It comes down to the level at which this API is going to be used. It looks like this API currently ignores bucketing and partitioning (unless my read through was too quick). I think I agree that in the long term that's a good thing, but we need ways for a data source to tell Spark about its requirements for incoming data. In the current version, it looks like Spark would know how to prepare data for writers outside of this API (rather than including support as suggested by point 3). When writing a partitioned table, Spark would get the partitioning from the table definition in the metastore and automatically sort by partition columns. Is that right? I'd like to move the data store's requirements behind this API. For example, writing to HBase files directly requires sorting by key first. We don't want to do the sort in the writer because it may duplicate work (and isn't captured in the physical plan), and we also don't want to require Spark to know about the requirements of the HBase data store, or any other specific implementation. **DataFrameWriter API**: I'd like to talk about separating the API for table definitions and writes, but not necessarily as
[GitHub] spark pull request #19136: [SPARK-15689][SQL] data source v2 read path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19136#discussion_r138681562 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.io.Closeable; + +/** + * A data reader returned by a read task and is responsible for outputting data for a RDD partition. + */ +public interface DataReader extends Closeable { --- End diff -- The initialization is done when creating this `DataReader` from a `ReadTask`. That ensures that the initialization happens (easy to forget `open()`) and simplifies the checks that need to be done because `DataReader` can't exist otherwise. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19136#discussion_r138123207 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.v2.reader.downward.{CatalystFilterPushDownSupport, ColumnPruningSupport, FilterPushDownSupport} + +object DataSourceV2Strategy extends Strategy { + // TODO: write path + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { +case PhysicalOperation(projects, filters, DataSourceV2Relation(output, reader)) => + val attrMap = AttributeMap(output.zip(output)) + + val projectSet = AttributeSet(projects.flatMap(_.references)) + val filterSet = AttributeSet(filters.flatMap(_.references)) + + // Match original case of attributes. + // TODO: nested fields pruning + val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap) --- End diff -- It seems reasonable to only request the ones that will be used, or that have residuals after pushing filters. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19136#discussion_r137829674 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader +import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport + +case class DataSourceV2Relation( +output: Seq[AttributeReference], +reader: DataSourceV2Reader) extends LeafNode { + + override def computeStats(): Statistics = reader match { +case r: StatisticsSupport => Statistics(sizeInBytes = r.getStatistics.sizeInBytes()) +case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) + } +} + +object DataSourceV2Relation { + def apply(reader: DataSourceV2Reader): DataSourceV2Relation = { +new DataSourceV2Relation(reader.readSchema().toAttributes, reader) --- End diff -- I think we should add a way to provide partition values outside of the columnar reader. It wouldn't be too difficult to add a method on `ReadTask` that returns them, then create a joined row in the scan exec. Otherwise, this requires a lot of wasted memory for a scan. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19136#discussion_r137597910 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader +import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport + +case class DataSourceV2Relation( +output: Seq[AttributeReference], +reader: DataSourceV2Reader) extends LeafNode { + + override def computeStats(): Statistics = reader match { +case r: StatisticsSupport => Statistics(sizeInBytes = r.getStatistics.sizeInBytes()) +case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) + } +} + +object DataSourceV2Relation { + def apply(reader: DataSourceV2Reader): DataSourceV2Relation = { +new DataSourceV2Relation(reader.readSchema().toAttributes, reader) --- End diff -- Are you saying that partition pruning isn't delegated to the data source in this interface? I was just looking into how the data source should provide partition data, or at least fields that are the same for all rows in a `ReadTask`. It would be nice to have a way to pass those up instead of materializing them in each `UnsafeRow`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19136#discussion_r137591425 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader +import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport + +case class DataSourceV2Relation( +output: Seq[AttributeReference], +reader: DataSourceV2Reader) extends LeafNode { + + override def computeStats(): Statistics = reader match { +case r: StatisticsSupport => Statistics(sizeInBytes = r.getStatistics.sizeInBytes()) +case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) + } +} + +object DataSourceV2Relation { + def apply(reader: DataSourceV2Reader): DataSourceV2Relation = { +new DataSourceV2Relation(reader.readSchema().toAttributes, reader) --- End diff -- Is this the right schema? The docs for `readSchema` say it is the result of pushdown and projection, which doesn't seem appropriate for a `Relation`. Does relation represent a table that can be filtered and projected, or does it represent a single read? At least in the Hive read path, it's a table. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19136#discussion_r137587367 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SchemaRequiredDataSourceV2.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.types.StructType; + +/** + * A variant of `DataSourceV2` which requires users to provide a schema when reading data. A data + * source can inherit both `DataSourceV2` and `SchemaRequiredDataSourceV2` if it supports both schema + * inference and user-specified schemas. + */ +public interface SchemaRequiredDataSourceV2 { + + /** + * Create a `DataSourceV2Reader` to scan the data for this data source. + * + * @param schema the full schema of this data source reader. Full schema usually maps to the + * physical schema of the underlying storage of this data source reader, e.g. + * parquet files, JDBC tables, etc, while this reader may not read data with full --- End diff -- Maybe update the doc here, since JDBC sources and Parquet files probably shouldn't implement this. CSV and JSON are the examples that come to mind for sources that require a schema. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/19136 Thanks for pinging me. I left comments on the older PR, since other discussion was already there. If you'd prefer comments here, just let me know. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] incubator-toree pull request #128: [TOREE-425] Force sparkContext initializa...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/128#discussion_r127859204 --- Diff: kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala --- @@ -82,6 +84,8 @@ trait StandardComponentInitialization extends ComponentInitialization { initializePlugins(config, pluginManager) +initializeSparkContext(config, kernel) --- End diff -- Should this be here, or in the kernel's initialization? I think we eventually want Toree to not require Spark, in which case we would have a SparkKernel and a regular Kernel. Then it would be the responsibility of the SparkKernel to detect that the application is in cluster mode and initialize. Doing this sooner rather than later would avoid more changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #128: [TOREE-425] Force sparkContext initializa...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/128#discussion_r127858852 --- Diff: kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala --- @@ -94,6 +98,18 @@ trait StandardComponentInitialization extends ComponentInitialization { } + def initializeSparkContext(config:Config, kernel:Kernel) = { +// TOREE:425 Spark cluster mode requires a context to be initialized before +// it register the application as Running +if ( SparkUtils.isSparkClusterMode(kernel.sparkConf) ) { --- End diff -- Nit: Are the spaces needed? I think this is non-standard for Scala or Java projects. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #128: [TOREE-425] Force sparkContext initializa...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/128#discussion_r127858771 --- Diff: kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala --- @@ -20,8 +20,11 @@ package org.apache.toree.kernel.api import java.io.{InputStream, PrintStream} import java.net.URI import java.util.concurrent.{ConcurrentHashMap, TimeUnit, TimeoutException} + --- End diff -- Nit: unnecessary changes in imports make backports and branch maintenance harder. I'd prefer not to have these changes unless there is a stated style that this changes conforms to. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #128: [TOREE-425] Force sparkContext initializa...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/128#discussion_r127858615 --- Diff: kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala --- @@ -414,13 +417,15 @@ class Kernel ( Await.result(sessionFuture, Duration(100, TimeUnit.MILLISECONDS)) } catch { case timeout: TimeoutException => -// getting the session is taking a long time, so assume that Spark -// is starting and print a message -display.content( - MIMEType.PlainText, "Waiting for a Spark session to start...") +// in cluster mode, the sparkContext is forced to initialize +if (SparkUtils.isSparkClusterMode(defaultSparkConf) == false) { --- End diff -- Instead of preventing the message from being sent, I think this should update the logic so that this uses the second case. That case just creates a new context without the futures or Await calls. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark issue #18450: [SPARK-21238][SQL] allow nested SQL execution
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/18450 I think it is good that this would no longer throw exceptions at runtime. Is the purpose of not allowing nested executions to minimize the queries shown in the UI? If that's the only purpose then I agree that just eliminating the empty ones is a good strategy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18419: [SPARK-20213][SQL][follow-up] introduce SQLExecut...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/18419#discussion_r124331858 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala --- @@ -100,7 +105,9 @@ object SQLExecution { // all accumulator metrics will be 0. It will confuse people if we show them in Web UI. // // A real case is the `DataFrame.count` method. - throw new IllegalArgumentException(s"$EXECUTION_ID_KEY is already set") + throw new IllegalArgumentException(s"$EXECUTION_ID_KEY is already set, please wrap your " + --- End diff -- The problem is that this is an easy error to hit and it shouldn't affect end users. It is better to warn that something is wrong than to fail a job that would otherwise succeed for a bug in Spark. As for the error message, I think it is fine if we intend to leave it in. I'd just rather not fail user jobs here. I assume that DataSource developers will have tests, but probably not ones that know to set spark.testing. Is there a better way to detect test cases? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18419: [SPARK-20213][SQL][follow-up] introduce SQLExecution.ign...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/18419 One minor comment, otherwise +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18419: [SPARK-20213][SQL][follow-up] introduce SQLExecut...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/18419#discussion_r124058801 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala --- @@ -100,7 +105,9 @@ object SQLExecution { // all accumulator metrics will be 0. It will confuse people if we show them in Web UI. // // A real case is the `DataFrame.count` method. - throw new IllegalArgumentException(s"$EXECUTION_ID_KEY is already set") + throw new IllegalArgumentException(s"$EXECUTION_ID_KEY is already set, please wrap your " + --- End diff -- Nested execution is a developer problem, not a user problem. That's why the [original PR](https://github.com/apache/spark/pull/17540/files#diff-ab49028253e599e6e74cc4f4dcb2e3a8R127) did not throw `IllegalArgumentException` outside of testing. I think that should still be how this is handled. If this is thrown at runtime, adding the text about `ignoreNestedExecutionId` is confusing for users, who can't (or shouldn't) set it. A comment is more appropriate if users will see this message. If the change to only throw during testing is added, then I think it is fine to add the text to the exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] incubator-toree pull request #123: TOREE-415: Cancel Spark jobs when interru...
Github user rdblue closed the pull request at: https://github.com/apache/incubator-toree/pull/123 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #123: TOREE-415: Cancel Spark jobs when interru...
GitHub user rdblue reopened a pull request: https://github.com/apache/incubator-toree/pull/123 TOREE-415: Cancel Spark jobs when interrupted. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/incubator-toree TOREE-415-interrupt-spark-jobs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-toree/pull/123.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #123 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...
Github user rdblue closed the pull request at: https://github.com/apache/incubator-toree/pull/104 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/104#discussion_r122049346 --- Diff: kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTransformer.scala --- @@ -43,7 +43,7 @@ class BrokerTransformer { import scala.concurrent.ExecutionContext.Implicits.global futureResult - .map(results => (Results.Success, Left(results))) + .map(results => (Results.Success, Left(Map("text/plain" -> results --- End diff -- Opened https://issues.apache.org/jira/browse/TOREE-418 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/104#discussion_r122048356 --- Diff: kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/relay/ExecuteRequestRelaySpec.scala --- @@ -88,7 +89,7 @@ class ExecuteRequestRelaySpec extends TestKit( // Expected does not actually match real return of magic, which // is a tuple of ExecuteReply and ExecuteResult val expected = new ExecuteAborted() -interpreterActorProbe.expectMsgClass( +interpreterActorProbe.expectMsgClass(max = Duration(5, TimeUnit.SECONDS), --- End diff -- Opened https://issues.apache.org/jira/browse/TOREE-417 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/104#discussion_r122047867 --- Diff: kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/stream/KernelInputStreamSpec.scala --- @@ -65,6 +65,9 @@ class KernelInputStreamSpec // set of data doReturn(system.actorSelection(fakeInputOutputHandlerActor.path.toString)) .when(mockActorLoader).load(MessageType.Incoming.InputReply) +// Allow time for the actors to start. This avoids read() hanging forever +// when running tests in gradle. +Thread.sleep(100) --- End diff -- Opened https://issues.apache.org/jira/browse/TOREE-416 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/104#discussion_r122046211 --- Diff: kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTransformer.scala --- @@ -43,7 +43,7 @@ class BrokerTransformer { import scala.concurrent.ExecutionContext.Implicits.global futureResult - .map(results => (Results.Success, Left(results))) + .map(results => (Results.Success, Left(Map("text/plain" -> results --- End diff -- No, kernel-api doesn't depend on protocol so it isn't available. But I think that this code shouldn't be sending mime types in the first place. Sending text/plain here happens because we haven't updated the Python kernel to correctly inspect its objects. Eventually, the python interpreter should send back a result that is a py4j reference to the final value and the python representation (from _repr_ and _repr_html_) or a JVM object and None to indicate that the Jupyter JVM displayer should be called. So for now, I'd rather not change module dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/104#discussion_r121765250 --- Diff: scala-interpreter/build.sbt --- @@ -18,3 +18,4 @@ import sbt.Tests.{Group, SubProcess} */ libraryDependencies ++= Dependencies.sparkAll.value +libraryDependencies += "com.github.jupyter" % "jvm-repr" % "0.1.0" --- End diff -- Yes, we probably should. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/104#discussion_r121765086 --- Diff: scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaDisplayers.scala --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.toree.kernel.interpreter.scala + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.Try +import org.apache.spark.SparkContext +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession +import jupyter.Displayer +import jupyter.Displayers +import jupyter.MIMETypes +import org.apache.toree.kernel.protocol.v5.MIMEType +import org.apache.toree.magic.MagicOutput + +object ScalaDisplayers { + + def ensureLoaded(): Unit = () --- End diff -- Will do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/104#discussion_r121757635 --- Diff: scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala --- @@ -18,30 +18,34 @@ package org.apache.toree.kernel.interpreter.scala import java.io.ByteArrayOutputStream -import java.net.{URL, URLClassLoader} -import java.nio.charset.Charset import java.util.concurrent.ExecutionException - import com.typesafe.config.{Config, ConfigFactory} +import jupyter.Displayers import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession import org.apache.spark.repl.Main - import org.apache.toree.interpreter._ -import org.apache.toree.kernel.api.{KernelLike, KernelOptions} +import org.apache.toree.kernel.api.KernelLike import org.apache.toree.utils.TaskManager import org.slf4j.LoggerFactory import org.apache.toree.kernel.BuildInfo - +import org.apache.toree.kernel.protocol.v5.MIMEType import scala.annotation.tailrec +import scala.collection.JavaConverters._ import scala.concurrent.{Await, Future} import scala.language.reflectiveCalls import scala.tools.nsc.Settings import scala.tools.nsc.interpreter.{IR, OutputStream} import scala.tools.nsc.util.ClassPath -import scala.util.{Try => UtilTry} +import scala.util.matching.Regex class ScalaInterpreter(private val config:Config = ConfigFactory.load) extends Interpreter with ScalaInterpreterSpecific { + import ScalaInterpreter._ + + ScalaDisplayers.ensureLoaded() --- End diff -- This makes sure that the registration in `ScalaDisplayers` happens when that class loads. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/104#discussion_r121757522 --- Diff: scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaDisplayers.scala --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.toree.kernel.interpreter.scala + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.Try +import org.apache.spark.SparkContext +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession +import jupyter.Displayer +import jupyter.Displayers +import jupyter.MIMETypes +import org.apache.toree.kernel.protocol.v5.MIMEType +import org.apache.toree.magic.MagicOutput + +object ScalaDisplayers { + + def ensureLoaded(): Unit = () --- End diff -- Calling this ensures the `ScalaDisplayers` object has been loaded, which performs the registration. Without calling this to make sure it loads, the registration further down never happens and we don't get representations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/104#discussion_r121757204 --- Diff: scala-interpreter/build.sbt --- @@ -18,3 +18,4 @@ import sbt.Tests.{Group, SubProcess} */ libraryDependencies ++= Dependencies.sparkAll.value +libraryDependencies += "com.github.jupyter" % "jvm-repr" % "0.1.0" --- End diff -- I think we'll need to take a look after this PR at the other interpreters. PySpark, for example, should adhere to the common python convention to use _repr_html_ for objects that exist in the python process, and should use the Jupyter library for objects that are resident in the JVM (like DataFrames). The background on Jupyter's jvm-repr library is that I'm trying to work with the Jupyter community to standardize a system for libraries and kernels to collaborate on rich representations for the JVM. Vegas, for example, should register a function to display its objects so it doesn't have to be built into the kernel or bolted on by users. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/104#discussion_r121756270 --- Diff: kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/relay/ExecuteRequestRelaySpec.scala --- @@ -88,7 +89,7 @@ class ExecuteRequestRelaySpec extends TestKit( // Expected does not actually match real return of magic, which // is a tuple of ExecuteReply and ExecuteResult val expected = new ExecuteAborted() -interpreterActorProbe.expectMsgClass( +interpreterActorProbe.expectMsgClass(max = Duration(5, TimeUnit.SECONDS), --- End diff -- The default is 3 seconds, but it was flaky for me. This was another work-around that probably shouldn't be here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-toree/pull/104#discussion_r121755733 --- Diff: kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/stream/KernelInputStreamSpec.scala --- @@ -65,6 +65,9 @@ class KernelInputStreamSpec // set of data doReturn(system.actorSelection(fakeInputOutputHandlerActor.path.toString)) .when(mockActorLoader).load(MessageType.Incoming.InputReply) +// Allow time for the actors to start. This avoids read() hanging forever +// when running tests in gradle. +Thread.sleep(100) --- End diff -- I do, too. The problem is that this test is flaky on my machine without it. We need to take a closer look at the flaky test, but I don't think this PR is the place to fix it. How about I open an issue for it and we solve the problem separately? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #124: TOREE-407: Add support for hdfs and s3 to...
Github user rdblue closed the pull request at: https://github.com/apache/incubator-toree/pull/124 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #125: TOREE-408: Add support for hdfs and s3 to...
GitHub user rdblue opened a pull request: https://github.com/apache/incubator-toree/pull/125 TOREE-408: Add support for hdfs and s3 to AddJar. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/incubator-toree TOREE-408-add-jar-support-hadoop-file-systems Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-toree/pull/125.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #125 commit b1b436f986b87764eb6db3ca528585e9d1147229 Author: Ryan Blue Date: 2017-03-13T16:17:50Z TOREE-408: Add support for hdfs and s3 to AddJar. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #124: TOREE-407: Add support for hdfs and s3 to...
GitHub user rdblue opened a pull request: https://github.com/apache/incubator-toree/pull/124 TOREE-407: Add support for hdfs and s3 to AddJar. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/incubator-toree TOREE-407-add-jar-support-hadoop-file-systems Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-toree/pull/124.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #124 commit 98c33cf740d4562075ecca603eca90d0ff4c4d8f Author: Ryan Blue Date: 2017-03-13T16:17:50Z TOREE-407: Add support for hdfs and s3 to AddJar. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #123: TOREE-415: Cancel Spark jobs when interru...
GitHub user rdblue opened a pull request: https://github.com/apache/incubator-toree/pull/123 TOREE-415: Cancel Spark jobs when interrupted. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/incubator-toree TOREE-415-interrupt-spark-jobs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-toree/pull/123.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #123 commit da162269c13138f6560bb0d9dab7a6c61b311e26 Author: Ryan Blue Date: 2017-06-02T18:37:31Z TOREE-415: Cancel Spark jobs when interrupted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...
GitHub user rdblue reopened a pull request: https://github.com/apache/incubator-toree/pull/104 TOREE-380: Allow interpreters to format output. This branch has the changes that I made to our Toree distribution to allow interpreters to format output. The goal is to enable better integration when libraries are used with Toree, similar to conventions in IPython like `_repr_html_` methods. In IPython, classes can define a `_repr_html_` method that is used to produce a `text/html` representation of an output object that is shown in the Jupyter notebook. This branch adds a `Displayer` class that can be registered with the Scala interpreter and updates the interpreter so that it fetches the last object of an interpreted block and calls an appropriate displayer to get its output representations. I think this is a reasonable way to maintain type safety and allow libraries to produce HTML outputs in notebooks. I'm also interested in using the plugin system to register these. In addition, there are a few example displayer implementations: * A SparkSession/SparkContext displayer that produces HTML links to the a Spark application * A [Vegas](https://github.com/vegas-viz/Vegas) displayer for graphs * A MagicOutput displayer to avoid the need for post-processing * A default displayer for AnyRef that calls toString and (as proof-of-concept) uses reflection to call toHtml Implementation required changing the way outputs from interpreters and blocks are handled. Rather than an interpreter returning text, it now returns a mime-type to content map (which has various type names). It also changes magic to produce MagicOutput, instead of requiring post-processing. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/incubator-toree TOREE-380-add-result-content-map Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-toree/pull/104.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #104 commit 54340466c917d4ac10412305dee4773d09a4b015 Author: Ryan Blue Date: 2017-02-06T02:18:36Z TOREE-380: Allow interpreters to produce output by MIME type. This updates the interpreter API to return a Map of MIME type to content instead of a String that is rendered as text. This allows interpreters to show HTML as cell output. This also updates the output of magic functions to be a similar MIME type to content structure. This is cleaner and no longer requires hacky post-processing before relaying a cell's output. commit a78e9cb9b9c2f755c3b393a8144fe647fdb716bb Author: Ryan Blue Date: 2017-04-10T01:42:17Z TOREE-380: Add support for Jupyter's jvm-repr API. commit fc9889015fa99f31968947ae71e2225a8e3fb504 Author: Ryan Blue Date: 2017-04-30T22:52:25Z TOREE-380: Fix tests. commit 9c9c9dc045cfddc3130ee596afba309dfbd540c4 Author: Ryan Blue Date: 2017-06-09T01:19:34Z TOREE-380: Add JVMReprSpec. commit b4c1510644cf8d33410ad3d71f78c522d27b5bbe Author: Ryan Blue Date: 2017-06-09T16:06:02Z TOREE-380: Fix Python integration tests. commit 7a50f20abe9c9a85b57e39fe3124fffb6f55d797 Author: Ryan Blue Date: 2017-06-09T16:34:01Z TOREE-380: Fix AddJar integration test. commit 1eaf4a0ae605e865deadad60e13328a1c2197e98 Author: Ryan Blue Date: 2017-06-10T18:50:10Z TOREE-380: Add copyright header to new files. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...
Github user rdblue closed the pull request at: https://github.com/apache/incubator-toree/pull/104 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #122: TOREE-414: Use ThreadFactoryBuilder to se...
GitHub user rdblue opened a pull request: https://github.com/apache/incubator-toree/pull/122 TOREE-414: Use ThreadFactoryBuilder to set name and daemon status. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/incubator-toree TOREE-414-fix-thread-creation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-toree/pull/122.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #122 commit 904f7bc5d4e153565451283180c33a0952c9f908 Author: Ryan Blue Date: 2017-04-27T21:05:27Z TOREE-414: Use ThreadFactoryBuilder to set name and daemon status. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #121: TOREE-413: Add exit commands and kernel i...
GitHub user rdblue opened a pull request: https://github.com/apache/incubator-toree/pull/121 TOREE-413: Add exit commands and kernel idle timeout. This adds support for "exit", "quit", and ":q" to exit Toree. It also adds a session idle timeout that defaults to 24h and a command-line option to control it. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/incubator-toree TOREE-413-add-exit-and-idle-timeout Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-toree/pull/121.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #121 commit c3e70ac835082a990353dfe584d176737147b4c8 Author: Ryan Blue Date: 2017-04-25T23:17:41Z TOREE-413: Add exit command and idle timeout. commit 6d42083e5aacad6a3ff881766fe1fb4f05480ff2 Author: Ryan Blue Date: 2017-04-26T19:44:46Z TOREE-413: Add --idle-timeout and default config. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-toree pull request #120: TOREE-412: Update AddDeps magic
GitHub user rdblue opened a pull request: https://github.com/apache/incubator-toree/pull/120 TOREE-412: Update AddDeps magic * Removes non-Jar files (POM files) from downloaded artifacts before adding dependencies to the kernel * Updates to Coursier 1.0.0-M15-7 * Uses "default" configuration by default, instead of "compile" for Ivy dependencies * Adds `--classifier` for maven artifacts * Adds `--ivy-configuration` for Ivy artifacts You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/incubator-toree TOREE-412-update-add-deps Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-toree/pull/120.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #120 commit 6289e78064c788e3d6c56397abd7278fd067128d Author: Ryan Blue Date: 2017-04-18T21:32:12Z TOREE-412: Fix AddDeps by removing POM files. commit 642b2d1463d7d48ae48dfafe262957f071fe63ef Author: Ryan Blue Date: 2017-04-19T20:03:09Z TOREE-412: Update Coursier to 1.0.0-M15-7. commit 039b549b6a0af30266f2be429629e7d79dba4fbb Author: Ryan Blue Date: 2017-04-19T21:55:56Z TOREE-412: Set AddDeps configuration to default. commit a7d15e4bbcdb4c321d3ce3996487953375e8d254 Author: Ryan Blue Date: 2017-04-25T18:37:58Z TOREE-412: Add --classifier to AddDeps. commit 8642253b5b694901b9d470c02c210a579cc3827e Author: Ryan Blue Date: 2017-04-25T21:19:48Z TOREE-412: Add --ivy-configuration option to AddDeps. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark issue #18162: [SPARK-20923] turn tracking of TaskMetrics._updatedBlock...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/18162 @tgravescs, I deployed this to our production environment (based on 2.0.0) a few days ago and haven't hit any problems with it. I think this is good to go, unless something has been added recently that uses the block statuses. +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18064: [SPARK-20213][SQL] Fix DataFrameWriter operations in SQL...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/18064 Refactor? I thought that was the problem with the [original PR](https://github.com/apache/spark/pull/17540); that PR was too narrow and didn't unify the physical plans to get all metrics working. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18181: [SPARK-20958][SQL] Roll back parquet-mr 1.8.2 to 1.8.1
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/18181 It sounds like we should plan on a 1.8.3 and a 1.9.1 soon in the Parquet community. I'll start this up. On Fri, Jun 2, 2017 at 9:47 AM, Michael Allman wrote: > I can't speak to Parquet 1.8.x anymore. We use Parquet 1.9.0 plus a patch > for https://issues.apache.org/jira/browse/PARQUET-783 and have had no > problems. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/18181#issuecomment-305847279>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAFXa0FiKB143yePbSsis2Dq-yGOwRLJks5sADy7gaJpZM4Nttfk> > . > -- Ryan Blue --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18181: [SPARK-20958][SQL] Roll back parquet-mr 1.8.2 to 1.8.1
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/18181 -1, with comments on the JIRA issue. I think it is better to include the Parquet fixes in 1.8.2 since Parquet doesn't pull in Avro 1.8.1 - that happens when users declare their own dependency on parquet-avro. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18064: [SPARK-20213][SQL] Fix DataFrameWriter operations in SQL...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/18064 @cloud-fan, can you summarize how this differs from the original PR #17540? I have time to pick this up again, but I thought that the other PR only needed two changes: * Merge your changes to unify the physical plans when using `ExecuteCommandExec` * Move `withNewExecutionId` to stremaing batches, rather than the entire session This seems to have gone though a lot of updates for testing, though, and I don't see my original commits... is this a reimplementation of those changes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18064: [SPARK-20213][SQL] Fix DataFrameWriter operations in SQL...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/18064 Agreed, sorry I haven't updated it. I was out most of last week. I'll get this fixed up as soon as I can. Thanks for all your help! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17680: [SPARK-20364][SQL] Support Parquet predicate pushdown on...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/17680 There's an open PR ([#361](https://github.com/apache/parquet-mr/pull/361)), to support quoted column names, but the discussion on the merits of it is on-going. I don't see a huge benefit to supporting `.` in column names, and I'm concerned that it breaks readers like parquet-avro that specifically don't allow them. More discussion from the Spark community is welcome there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17680: [SPARK-20364][SQL] Support Parquet predicate pushdown on...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/17680 @gatorsmile, sorry for not responding, I was on vacation for a few days. Should I still review this even though it is merged? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12313: [SPARK-14543] [SQL] Improve InsertIntoTable column resol...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/12313 We were trying to get this in just before the 2.0 release, which was a bad time. We've just been maintaining it in our version, but I'm going to be rebasing it on to 2.1 soon so I'll see what needs to be done to get it in upstream. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13206: [SPARK-15420] [SQL] Add repartition and sort to prepare ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/13206 @HyukjinKwon, that addresses part of what this patch does, but only for writes that go through FileFormatWriter. This patch works for Hive and adds an optimizer rule to add the sort instead of sorting in the writer, which I don't think is a great idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org