[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...

2017-10-13 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19448
  
I have a lot of sympathy for the argument that infrastructure software 
shouldn't have too many backports and that those should be generally bug fixes. 
But, if I were working on a Spark distribution at a vendor, this is something I 
would definitely include because it's such a useful feature. I think that by 
not backporting this, we're just pushing that work downstream. Plus, the risk 
to adding this is low: the main behavior change is that users can specify a 
previously-banned committer for Parquet writes. Is it a bug fix? Probably not. 
But it fixes a big blocker.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r144388430
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -138,6 +138,10 @@ class ParquetFileFormat
   conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
 }
 
+require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
--- End diff --

I think once per write operation is fine. It's not like it is once per file.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...

2017-10-12 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19448
  
Still +1 from me as well.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r144331909
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -138,6 +138,10 @@ class ParquetFileFormat
   conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
 }
 
+require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
--- End diff --

I think I'd prefer the warn & continue option. It does little good to fail 
so late in a job, when the caller has already indicated that they want to use a 
different committer. Let them write the data out since this isn't a correctness 
issue, and they can add a summary file later if they want. Basically, there's 
less annoyance and interruption by not writing a summary file than by failing a 
job and forcing the user to re-run near the end.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r144097061
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link DataWriter}.
+ *
+ * The writing procedure is:
+ *   1. Create a writer factory by {@link #createWriterFactory()}, 
serialize and send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create the data writer, and write the data of 
the partition with this
+ *  writer. If all the data are written successfully, call {@link 
DataWriter#commit()}. If
+ *  exception happens during the writing, call {@link 
DataWriter#abort()}.
+ *   3. If all writers are successfully committed, call {@link 
#commit(WriterCommitMessage[])}. If
+ *  some writers are aborted, or the job failed with an unknown 
reason, call
+ *  {@link #abort(WriterCommitMessage[])}.
+ *
+ * Spark won't retry failed writing jobs, users should do it manually in 
their Spark applications if
+ * they want to retry.
+ *
+ * Please refer to the document of commit/abort methods for detailed 
specifications.
+ *
+ * Note that, this interface provides a protocol between Spark and data 
sources for transactional
+ * data writing, but the transaction here is Spark-level transaction, 
which may not be the
+ * underlying storage transaction. For example, Spark successfully writes 
data to a Cassandra data
+ * source, but Cassandra may need some more time to reach consistency at 
storage level.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceV2Writer {
+
+  /**
+   * Creates a writer factory which will be serialized and sent to 
executors.
+   */
+  DataWriterFactory createWriterFactory();
+
+  /**
+   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
+   * successful data writers and are produced by {@link 
DataWriter#commit()}. If this method
+   * fails(throw exception), this writing job is considered to be failed, 
and
+   * {@link #abort(WriterCommitMessage[])} will be called. The written 
data should only be visible
+   * to data source readers if this method successes.
+   *
+   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
+   * Spark will pick the first successful one and get its commit message. 
Implementations should be
+   * aware of this and handle it correctly, e.g., have a mechanism to make 
sure only one data writer
+   * can commit successfully, or have a way to clean up the data of 
already-committed writers.
+   */
+  void commit(WriterCommitMessage[] messages);
+
+  /**
+   * Aborts this writing job because some data writers are failed to write 
the records and aborted,
+   * or the Spark job fails with some unknown reasons, or {@link 
#commit(WriterCommitMessage[])}
+   * fails. If this method fails(throw exception), the underlying data 
source may have garbage that
+   * need to be cleaned manually, but these garbage should not be visible 
to data source readers.
+   *
+   * Unless the abortion is triggered by the failure of commit, the given 
messages should h

[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...

2017-10-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19448#discussion_r144088592
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -138,6 +138,10 @@ class ParquetFileFormat
   conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
 }
 
+require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
--- End diff --

`AnalysisException`? Shouldn't this be `SparkException`? By the time this 
runs, Spark has already analyzed, optimized, and planned the job. Doesn't seem 
like failing analysis is appropriate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19424: [SPARK-22197][SQL] push down operators to data source be...

2017-10-10 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19424
  
What are the guarantees made by the previous batches in the optimizer? The 
work done by `FilterAndProject` seems redundant to me because the optimizer 
should already push filters below projection. Is that not guaranteed by the 
time this runs?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-10 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19269
  
> There is no restriction to let the output of data writers be visible to 
other writers, so it's possible to launch a write task just for cleaning up the 
data of other writers.

Agreed. Other writers can possibly see and change the data.

I'm not sure if you intend this to cover some aspect of abort, but this 
wouldn't necessarily help with cleanup if you are. In my use case, a writer may 
produce a file in any partition in S3 depending on the data it writes. To clean 
up after another writer, I'd have to go through all of the input data to get 
partitions, then go list those partition locations for uncommitted files. It's 
not really practical to clean up this way so I don't think it could be a 
substitute for passing commit messages to job abort.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-09 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19269
  
> The only contract Spark needs is: data written/committed by tasks should 
not be visible to data source readers until the job-level commitment. But they 
can be visible to others like other writing tasks, so it's possible for data 
sources to implement "abort the output of the other writer".

I'm not following what you mean here.

> making DataSourceV2Writer.abort take commit messages is still a 
"best-effort" to clean up the data

Agreed. We should state something about this in the abort job docs: "Commit 
messages passed to abort are the messages for all commits that succeeded and 
sent a commit message to the driver. It is possible, though unlikely, for an 
executor to successfully commit data to a data source, but fail before sending 
the commit message to the driver."



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19424: [SPARK-22197][SQL] push down operators to data so...

2017-10-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19424#discussion_r143597547
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeMap, Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.optimizer.{PushDownPredicate, 
RemoveRedundantProject}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.sources.v2.reader._
+
+/**
+ * Pushes down various operators to the underlying data source for better 
performance. Operators are
+ * being pushed down with a specific order. As an example, given a LIMIT 
has a FILTER child, you
+ * can't push down LIMIT if FILTER is not completely pushed down. When 
both are pushed down, the
+ * data source should execute FILTER before LIMIT. And required columns 
are calculated at the end,
+ * because when more operators are pushed down, we may need less columns 
at Spark side.
+ */
+object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with 
PredicateHelper {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+// make sure filters are at very bottom.
+val prepared = PushDownPredicate(plan)
+val afterPushDown = prepared transformUp {
+  case Filter(condition, r @ DataSourceV2Relation(_, reader)) =>
+val (candidates, containingNonDeterministic) =
+  splitConjunctivePredicates(condition).span(_.deterministic)
+
+val stayUpFilters: Seq[Expression] = reader match {
+  case r: SupportsPushDownCatalystFilters =>
+r.pushCatalystFilters(candidates.toArray)
+
+  case r: SupportsPushDownFilters =>
+// A map from original Catalyst expressions to corresponding 
translated data source
+// filters. If a predicate is not in this map, it means it 
cannot be pushed down.
+val translatedMap: Map[Expression, sources.Filter] = 
candidates.flatMap { p =>
+  DataSourceStrategy.translateFilter(p).map(f => p -> f)
+}.toMap
+
+// Catalyst predicate expressions that cannot be converted to 
data source filters.
+val nonConvertiblePredicates = 
candidates.filterNot(translatedMap.contains)
+
+// Data source filters that cannot be pushed down. An 
unhandled filter means
+// the data source cannot guarantee the rows returned can pass 
the filter.
+// As a result we must return it so Spark can plan an extra 
filter operator.
+val unhandledFilters = 
r.pushFilters(translatedMap.values.toArray).toSet
+val unhandledPredicates = translatedMap.filter { case (_, f) =>
+  unhandledFilters.contains(f)
+}.keys
+
+nonConvertiblePredicates ++ unhandledPredicates
+
+  case _ => candidates
+}
+
+val filterCondition = (stayUpFilters ++ 
containingNonDeterministic).reduceLeftOption(And)
+filterCondition.map(Filter(_, r)).getOrElse(r)
+
+  // TODO: add more push down rules.
+}
+
+// TODO: nested fields pruning
+def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: 
Seq[Attribute]): Unit = {
+  plan match {
+case Project(projectList, child) =>
+  val required = 
projectList.filter(requiredByParent.contains).flatMap(_.references)
+  pushDownRequiredColumns(child, required)
+
+case Fil

[GitHub] spark pull request #19424: [SPARK-22197][SQL] push down operators to data so...

2017-10-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19424#discussion_r143593605
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeMap, Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.optimizer.{PushDownPredicate, 
RemoveRedundantProject}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.sources.v2.reader._
+
+/**
+ * Pushes down various operators to the underlying data source for better 
performance. Operators are
+ * being pushed down with a specific order. As an example, given a LIMIT 
has a FILTER child, you
+ * can't push down LIMIT if FILTER is not completely pushed down. When 
both are pushed down, the
+ * data source should execute FILTER before LIMIT. And required columns 
are calculated at the end,
+ * because when more operators are pushed down, we may need less columns 
at Spark side.
+ */
+object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with 
PredicateHelper {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+// make sure filters are at very bottom.
+val prepared = PushDownPredicate(plan)
+val afterPushDown = prepared transformUp {
+  case Filter(condition, r @ DataSourceV2Relation(_, reader)) =>
+val (candidates, containingNonDeterministic) =
+  splitConjunctivePredicates(condition).span(_.deterministic)
--- End diff --

It isn't immediately clear why you would use `span` here instead of 
`partition`. I think it is because `span` will produce all deterministic 
predicates that would be run before the first non-deterministic predicate in an 
in-order traversal of teh condition, right? If so, then a comment would be 
really useful to make this clear. I'd also like to see a comment about why 
deterministic predicates "after" the first non-deterministic predicate 
shouldn't be pushed down. An example would really help, too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19424: [SPARK-22197][SQL] push down operators to data so...

2017-10-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19424#discussion_r143597669
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeMap, Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.optimizer.{PushDownPredicate, 
RemoveRedundantProject}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.sources.v2.reader._
+
+/**
+ * Pushes down various operators to the underlying data source for better 
performance. Operators are
+ * being pushed down with a specific order. As an example, given a LIMIT 
has a FILTER child, you
+ * can't push down LIMIT if FILTER is not completely pushed down. When 
both are pushed down, the
+ * data source should execute FILTER before LIMIT. And required columns 
are calculated at the end,
+ * because when more operators are pushed down, we may need less columns 
at Spark side.
+ */
+object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with 
PredicateHelper {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+// make sure filters are at very bottom.
+val prepared = PushDownPredicate(plan)
+val afterPushDown = prepared transformUp {
+  case Filter(condition, r @ DataSourceV2Relation(_, reader)) =>
+val (candidates, containingNonDeterministic) =
+  splitConjunctivePredicates(condition).span(_.deterministic)
+
+val stayUpFilters: Seq[Expression] = reader match {
+  case r: SupportsPushDownCatalystFilters =>
+r.pushCatalystFilters(candidates.toArray)
+
+  case r: SupportsPushDownFilters =>
+// A map from original Catalyst expressions to corresponding 
translated data source
+// filters. If a predicate is not in this map, it means it 
cannot be pushed down.
+val translatedMap: Map[Expression, sources.Filter] = 
candidates.flatMap { p =>
+  DataSourceStrategy.translateFilter(p).map(f => p -> f)
+}.toMap
+
+// Catalyst predicate expressions that cannot be converted to 
data source filters.
+val nonConvertiblePredicates = 
candidates.filterNot(translatedMap.contains)
+
+// Data source filters that cannot be pushed down. An 
unhandled filter means
+// the data source cannot guarantee the rows returned can pass 
the filter.
+// As a result we must return it so Spark can plan an extra 
filter operator.
+val unhandledFilters = 
r.pushFilters(translatedMap.values.toArray).toSet
+val unhandledPredicates = translatedMap.filter { case (_, f) =>
+  unhandledFilters.contains(f)
+}.keys
+
+nonConvertiblePredicates ++ unhandledPredicates
+
+  case _ => candidates
+}
+
+val filterCondition = (stayUpFilters ++ 
containingNonDeterministic).reduceLeftOption(And)
+filterCondition.map(Filter(_, r)).getOrElse(r)
+
+  // TODO: add more push down rules.
+}
+
+// TODO: nested fields pruning
+def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: 
Seq[Attribute]): Unit = {
+  plan match {
+case Project(projectList, child) =>
+  val required = 
projectList.filter(requiredByParent.contains).flatMap(_.references)
+  pushDownRequiredColumns(child, required)
+
+case Fil

[GitHub] spark pull request #19424: [SPARK-22197][SQL] push down operators to data so...

2017-10-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19424#discussion_r143591559
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeMap, Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.optimizer.{PushDownPredicate, 
RemoveRedundantProject}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.sources.v2.reader._
+
+/**
+ * Pushes down various operators to the underlying data source for better 
performance. Operators are
+ * being pushed down with a specific order. As an example, given a LIMIT 
has a FILTER child, you
+ * can't push down LIMIT if FILTER is not completely pushed down. When 
both are pushed down, the
+ * data source should execute FILTER before LIMIT. And required columns 
are calculated at the end,
+ * because when more operators are pushed down, we may need less columns 
at Spark side.
+ */
+object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with 
PredicateHelper {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+// make sure filters are at very bottom.
+val prepared = PushDownPredicate(plan)
--- End diff --

Why apply this rule one more time? Is there reason to suspect that 
predicates won't already be pushed and that one more run would be worth it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...

2017-10-09 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19394
  
Thanks for reviewing, @gatorsmile!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r143524057
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleWritableDataSource.java
 ---
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.sources.v2.reader.DataReader;
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.ReadTask;
+import org.apache.spark.sql.sources.v2.writer.*;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+
+public class JavaSimpleWritableDataSource implements DataSourceV2, 
ReadSupport, WriteSupport {
+  private StructType schema = new StructType().add("i", "long").add("j", 
"long");
+
+  class Reader implements DataSourceV2Reader {
+private String path;
+
+Reader(String path) {
+  this.path = path;
+}
+
+@Override
+public StructType readSchema() {
+  return schema;
+}
+
+@Override
+public List> createReadTasks() {
+  return java.util.Arrays.asList(new JavaSimpleCSVReadTask(path));
+}
+  }
+
+  static class JavaSimpleCSVReadTask implements ReadTask, 
DataReader {
+private String path;
+private volatile Iterator lines;
+private volatile String currentLine;
+
+JavaSimpleCSVReadTask(Iterator lines) {
+  this.lines = lines;
+}
+
+JavaSimpleCSVReadTask(String path) {
+  this.path = path;
+}
+
+@Override
+public DataReader createReader() {
+  assert path != null;
+  try {
+if (Files.exists(Paths.get(path))) {
+  return new 
JavaSimpleCSVReadTask(Files.readAllLines(Paths.get(path)).iterator());
+} else {
+  return new JavaSimpleCSVReadTask(Collections.emptyIterator());
+}
+  } catch (IOException e) {
+throw new RuntimeException(e);
--- End diff --

Does Spark have a `RuntimeIOException`? I typically like to use those so 
that the IOException can still be caught and handled by code that chooses to.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143517522
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -274,19 +274,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 val byteArrayRdd = getByteArrayRdd()
 
 val results = ArrayBuffer[InternalRow]()
-byteArrayRdd.collect().foreach { bytes =>
-  decodeUnsafeRows(bytes).foreach(results.+=)
+byteArrayRdd.collect().foreach { rdd =>
+  decodeUnsafeRows(rdd._2).foreach(results.+=)
 }
 results.toArray
   }
 
+  private[spark] def executeCollectIterator(): (Long, 
Iterator[InternalRow]) = {
+val countsAndBytes = getByteArrayRdd().collect()
+val total = countsAndBytes.map(_._1).sum
+val rows = countsAndBytes.iterator.flatMap(rdd => 
decodeUnsafeRows(rdd._2))
+(total, rows)
+  }
+
   /**
* Runs this query returning the result as an iterator of InternalRow.
*
* @note Triggers multiple jobs (one for each partition).
*/
   def executeToIterator(): Iterator[InternalRow] = {
-getByteArrayRdd().toLocalIterator.flatMap(decodeUnsafeRows)
+getByteArrayRdd().toLocalIterator.flatMap(rdd => 
decodeUnsafeRows(rdd._2))
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143517490
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -274,19 +274,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 val byteArrayRdd = getByteArrayRdd()
 
 val results = ArrayBuffer[InternalRow]()
-byteArrayRdd.collect().foreach { bytes =>
-  decodeUnsafeRows(bytes).foreach(results.+=)
+byteArrayRdd.collect().foreach { rdd =>
+  decodeUnsafeRows(rdd._2).foreach(results.+=)
 }
 results.toArray
   }
 
+  private[spark] def executeCollectIterator(): (Long, 
Iterator[InternalRow]) = {
+val countsAndBytes = getByteArrayRdd().collect()
+val total = countsAndBytes.map(_._1).sum
+val rows = countsAndBytes.iterator.flatMap(rdd => 
decodeUnsafeRows(rdd._2))
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143317742
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ---
@@ -58,7 +58,7 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSQLContext {
   withSQLConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION.key -> 
"1") {
 // If we only sample one point, the range boundaries will be 
pretty bad and the
 // chi-sq value would be very high.
-assert(computeChiSquareTest() > 1000)
+assert(computeChiSquareTest() > 300)
--- End diff --

Updating the SparkPlan code to avoid creating unnecessary RDDs fixed this 
test because the same random seed is used. But, I think we should keep this 
change so we don't have to track down this problem again in the future. This 
bound is perfectly safe because the balanced chi-sq value is 10, while the 
unbalanced is much larger.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143317686
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -280,13 +280,20 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 results.toArray
   }
 
+  private[spark] def executeCollectIterator(): (Long, 
Iterator[InternalRow]) = {
+val countsAndBytes = getByteArrayRdd().collect()
+val total = countsAndBytes.map(_._1).sum
+val rows = countsAndBytes.iterator.map(_._2).flatMap(decodeUnsafeRows)
+(total, rows)
+  }
+
   /**
* Runs this query returning the result as an iterator of InternalRow.
*
* @note Triggers multiple jobs (one for each partition).
*/
   def executeToIterator(): Iterator[InternalRow] = {
-getByteArrayRdd().toLocalIterator.flatMap(decodeUnsafeRows)
+getByteArrayRdd().toLocalIterator.map(_._2).flatMap(decodeUnsafeRows)
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143317518
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -280,13 +280,20 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 results.toArray
   }
 
+  private[spark] def executeCollectIterator(): (Long, 
Iterator[InternalRow]) = {
+val countsAndBytes = getByteArrayRdd().collect()
+val total = countsAndBytes.map(_._1).sum
+val rows = countsAndBytes.iterator.map(_._2).flatMap(decodeUnsafeRows)
+(total, rows)
+  }
+
   /**
* Runs this query returning the result as an iterator of InternalRow.
*
* @note Triggers multiple jobs (one for each partition).
*/
   def executeToIterator(): Iterator[InternalRow] = {
-getByteArrayRdd().toLocalIterator.flatMap(decodeUnsafeRows)
+getByteArrayRdd().toLocalIterator.map(_._2).flatMap(decodeUnsafeRows)
--- End diff --

Good point. I guess from the tests that this will create a new RDD, which 
isn't what we want.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19448: [SPARK-22217] [SQL] ParquetFileFormat to support arbitra...

2017-10-06 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19448
  
+1

I completely agree that using a ParquetOutputCommitter should be optional.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...

2017-10-06 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19394
  
Anyone have a clue what the python error could be? It doesn't look related.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143286130
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -73,25 +73,37 @@ case class BroadcastExchangeExec(
 try {
   val beforeCollect = System.nanoTime()
   // Note that we use .executeCollect() because we don't want to 
convert data to Scala types
-  val input: Array[InternalRow] = child.executeCollect()
-  if (input.length >= 51200) {
+  val (numRows, input) = child.executeCollectIterator()
+  if (numRows >= 51200) {
 throw new SparkException(
-  s"Cannot broadcast the table with more than 512 millions 
rows: ${input.length} rows")
+  s"Cannot broadcast the table with more than 512 millions 
rows: $numRows rows")
   }
+
   val beforeBuild = System.nanoTime()
   longMetric("collectTime") += (beforeBuild - beforeCollect) / 
100
-  val dataSize = 
input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
+
+  // Construct the relation.
+  val relation = mode.transform(input, Some(numRows))
+
+  val dataSize = relation match {
+case map: HashedRelation =>
+  map.estimatedSize
+case arr: Array[InternalRow] =>
+  arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
+case _ =>
+  numRows * 512 // guess: each row is about 512 bytes
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143286104
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -73,25 +73,37 @@ case class BroadcastExchangeExec(
 try {
   val beforeCollect = System.nanoTime()
   // Note that we use .executeCollect() because we don't want to 
convert data to Scala types
--- End diff --

Updated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143285737
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ---
@@ -58,7 +58,7 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSQLContext {
   withSQLConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION.key -> 
"1") {
 // If we only sample one point, the range boundaries will be 
pretty bad and the
 // chi-sq value would be very high.
-assert(computeChiSquareTest() > 1000)
+assert(computeChiSquareTest() > 300)
--- End diff --

@rxin, the difference that is causing this to fail is that `rdd.id` is 15 
instead of 14 because of the change to `getByteArrayRdd`. That id is used to 
seed the random generator and the chi-sq value isn't high enough. The previous 
value must have been unusually high. With so few data points, this can vary 
quite a bit so I think changing the bound to 300 is a good fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143228054
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -73,25 +73,37 @@ case class BroadcastExchangeExec(
 try {
   val beforeCollect = System.nanoTime()
   // Note that we use .executeCollect() because we don't want to 
convert data to Scala types
-  val input: Array[InternalRow] = child.executeCollect()
-  if (input.length >= 51200) {
+  val (numRows, input) = child.executeCollectIterator()
+  if (numRows >= 51200) {
 throw new SparkException(
-  s"Cannot broadcast the table with more than 512 millions 
rows: ${input.length} rows")
+  s"Cannot broadcast the table with more than 512 millions 
rows: $numRows rows")
   }
+
   val beforeBuild = System.nanoTime()
   longMetric("collectTime") += (beforeBuild - beforeCollect) / 
100
-  val dataSize = 
input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
+
+  // Construct the relation.
+  val relation = mode.transform(input, Some(numRows))
+
+  val dataSize = relation match {
+case map: HashedRelation =>
+  map.estimatedSize
--- End diff --

True, but I think using the accurate size is better. Maybe we should 
revisit the 8gb maximum instead of using a size that isn't correct. What's the 
reason for the 8gb max?

In master, building a broadcast table just under the 8gb limit would 
require 8gb for the rows (what we actually count here), plus the size of those 
rows compressed, which is probably at least 1gb. So we're talking about a 
pretty huge broadcast table to hit the case where this check will cause 
failures that didn't happen before. Not that it couldn't happen, but I've never 
seen this in practice and we have some huge jobs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143226823
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -73,25 +73,37 @@ case class BroadcastExchangeExec(
 try {
   val beforeCollect = System.nanoTime()
   // Note that we use .executeCollect() because we don't want to 
convert data to Scala types
-  val input: Array[InternalRow] = child.executeCollect()
-  if (input.length >= 51200) {
+  val (numRows, input) = child.executeCollectIterator()
+  if (numRows >= 51200) {
 throw new SparkException(
-  s"Cannot broadcast the table with more than 512 millions 
rows: ${input.length} rows")
+  s"Cannot broadcast the table with more than 512 millions 
rows: $numRows rows")
   }
+
   val beforeBuild = System.nanoTime()
   longMetric("collectTime") += (beforeBuild - beforeCollect) / 
100
-  val dataSize = 
input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
+
+  // Construct the relation.
+  val relation = mode.transform(input, Some(numRows))
+
+  val dataSize = relation match {
+case map: HashedRelation =>
+  map.estimatedSize
+case arr: Array[InternalRow] =>
--- End diff --

Yeah, both cases are used. The only one that won't match is `case _ => ...`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143226714
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -73,25 +73,37 @@ case class BroadcastExchangeExec(
 try {
   val beforeCollect = System.nanoTime()
   // Note that we use .executeCollect() because we don't want to 
convert data to Scala types
-  val input: Array[InternalRow] = child.executeCollect()
-  if (input.length >= 51200) {
+  val (numRows, input) = child.executeCollectIterator()
+  if (numRows >= 51200) {
 throw new SparkException(
-  s"Cannot broadcast the table with more than 512 millions 
rows: ${input.length} rows")
+  s"Cannot broadcast the table with more than 512 millions 
rows: $numRows rows")
   }
+
   val beforeBuild = System.nanoTime()
   longMetric("collectTime") += (beforeBuild - beforeCollect) / 
100
-  val dataSize = 
input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
+
+  // Construct the relation.
+  val relation = mode.transform(input, Some(numRows))
+
+  val dataSize = relation match {
+case map: HashedRelation =>
+  map.estimatedSize
+case arr: Array[InternalRow] =>
+  arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
+case _ =>
+  numRows * 512 // guess: each row is about 512 bytes
--- End diff --

The won't cause a regression because all the broadcast modes return either 
Array or HashedRelation. This is just in case there is a path that returns 
something different in the future. Maybe it would be better to throw an 
exception here. What do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...

2017-10-05 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19394
  
Here's the error message: TestFailedException: 347.5272 was not greater 
than 1000


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...

2017-10-05 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19394
  
Yes, we've been running this in production for a few weeks now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-05 Thread rdblue
GitHub user rdblue reopened a pull request:

https://github.com/apache/spark/pull/19394

[SPARK-22170][SQL] Reduce memory consumption in broadcast joins.

## What changes were proposed in this pull request?

This updates the broadcast join code path to lazily decompress pages and
iterate through UnsafeRows to prevent all rows from being held in memory
while the broadcast table is being built.

## How was this patch tested?

Existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark broadcast-driver-memory

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19394.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19394






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...

2017-10-05 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19394
  
@rxin, any idea why this would fail ConfigBehaviorSuite? I don't think the 
failure is related because that test doesn't use a broadcast join. Should I 
rebase on master?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-05 Thread rdblue
Github user rdblue closed the pull request at:

https://github.com/apache/spark/pull/19394


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r143060736
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -228,7 +228,7 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
 testSparkPlanMetrics(df, 2, Map(
   1L -> (("BroadcastHashJoin", Map(
 "number of output rows" -> 2L,
-"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"
+"avg hash probe (min, med, max)" -> "\n(1.1, 1.1, 1.1)"
--- End diff --

This test was incorrect. I debugged the metric and the value is 
consistently 1.1, with 9 lookups and 10 probes. 10/9 = 1.1... The reason why 
the test was passing before is that the metrics check was hitting a [race 
condition](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala#L165-L169)
 that caused the test to pass. If I change the values in master to 1.1, the 
tests still pass and demonstrate that the check isn't really happening.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: [SPARK-22170][SQL] Reduce memory consumption in b...

2017-10-03 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19394#discussion_r142474766
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -280,13 +280,20 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 results.toArray
   }
 
+  private[spark] def executeCollectIterator(): (Long, 
Iterator[InternalRow]) = {
+val countsAndBytes = getByteArrayRdd().collect()
--- End diff --

I agree, but this is better than before. This only keeps all of the rows in 
memory compressed, and then streams through the compressed blocks. Before this 
patch, the rows are copied into a buffer per row while holding the compressed 
blocks, so it held the rows compressed and uncompressed at the same time. The 
uncompressed rows are what this fixes, we can follow up with something better 
to stream through blocks from executors.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19394: [SPARK-22170][SQL] Reduce memory consumption in broadcas...

2017-09-29 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19394
  
Ideally, this would also use a TaskMemoryManager so the driver can spill 
results to disk instead of dying with an OOM. Is there any plan to add a memory 
manager for the driver?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19394: SPARK-22170: Reduce memory consumption in broadca...

2017-09-29 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/19394

SPARK-22170: Reduce memory consumption in broadcast joins.

This updates the broadcast join code path to lazily decompress pages and
iterate through UnsafeRows to prevent all rows from being held in memory
while the broadcast table is being built.

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark broadcast-driver-memory

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19394.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19394


commit d82ffca19c84589dfcb87a81996fb3ce2e31e91a
Author: Ryan Blue 
Date:   2017-08-11T00:41:56Z

SPARK-22170: Reduce memory consumption in broadcast joins.

This updates the broadcast join code path to lazily decompress pages and
iterate through UnsafeRows to prevent all rows from being held in memory
while the broadcast table is being built.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r141679351
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link DataWriter}.
+ *
+ * The writing procedure is:
+ *   1. Create a writer factory by {@link #createWriterFactory()}, 
serialize and send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create the data writer, and write the data of 
the partition with this
+ *  writer. If all the data are written successfully, call {@link 
DataWriter#commit()}. If
+ *  exception happens during the writing, call {@link 
DataWriter#abort()}. This step may repeat
+ *  several times as Spark will retry failed tasks.
+ *   3. If all writers are successfully committed, call {@link 
#commit(WriterCommitMessage[])}. If
+ *  some writers are aborted, or the job failed with an unknown 
reason, call {@link #abort()}.
+ *
+ * Spark may launch speculative tasks in step 2, so there may be more than 
one data writer working
+ * simultaneously for the same partition. Implementations should handle 
this case correctly, e.g.,
+ * make sure only one data writer can commit successfully, or only admit 
one committed data writer
+ * and ignore/revert others at job level.
+ *
+ * Note that, data sources are responsible for providing transaction 
ability by implementing the
+ * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link 
DataWriter} correctly.
+ * The transaction here is Spark-level transaction, which may not be the 
underlying storage
+ * transaction. For example, Spark successfully write data to a Cassandra 
data source, but
+ * Cassandra may need some more time to reach consistency at storage level.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceV2Writer {
+
+  /**
+   * Creates a writer factory which will be serialized and sent to 
executors.
+   */
+  DataWriteFactory createWriterFactory();
+
+  /**
+   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
+   * all data writers and are produced by {@link DataWriter#commit()}.
+   *
+   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
+   * Spark will pick the first successful one and get its commit message. 
Implementations should be
+   * aware of this and handle it correctly, e.g., have a mechanism to make 
sure only one data writer
+   * can commit successfully, or have a way to clean up the data of 
already committed writers.
+   */
+  void commit(WriterCommitMessage[] messages);
+
+  /**
+   * Aborts this writing job because some data writers are failed to write 
the records and aborted,
+   * or the Spark job failed with some unknown reasons.
+   *
+   * Note that some data writer may already be committed in this case, 
implementations should be
+   * aware of this and clean up the data.
+   */
+  void abort();
--- End diff --

Sorry I missed that!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For

[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-28 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r141679290
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create a data writer with the write task, and 
write the data of the
+ *  partition with this writer. If all the data are written 
successfully, call
+ *  {@link DataWriter#commit()}. If exception happens during the 
writing, call
+ *  {@link DataWriter#abort()}. This step may repeat several times as 
Spark will retry failed
+ *  tasks.
+ *   3. Wait until all the writers/partitions are finished, i.e., either 
committed or aborted. If
+ *  all partitions are written successfully, call {@link 
#commit(WriterCommitMessage[])}. If
+ *  some partitions failed and aborted, call {@link #abort()}.
+ *
+ * Note that, data sources are responsible for providing transaction 
ability by implementing the
+ * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link 
DataWriter} correctly.
+ * The transaction here is Spark-level transaction, which may not be the 
underlying storage
+ * transaction. For example, Spark successfully write data to a Cassandra 
data source, but
+ * Cassandra may need some more time to reach consistency at storage level.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceV2Writer {
+
+  /**
+   * Creates a write task which will be serialized and sent to executors. 
For each partition of the
+   * input data(RDD), there will be one write task to write the records.
+   */
+  WriteTask createWriteTask();
+
+  /**
+   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
+   * all data writers for this writing job and are produced by {@link 
DataWriter#commit()}. This
+   * also means all the data are written successfully and all data writers 
are committed.
+   */
+  void commit(WriterCommitMessage[] messages);
+
+  /**
+   * Aborts this writing job because some data writers are failed to write 
the records and aborted.
+   */
+  void abort();
--- End diff --

Can you explain the situation you're talking about a bit more?

I think Spark should pass everything it can to the abort. I agree that the 
abort here should be flexible and a best-effort, but there are situations where 
it can't know how to roll back everything that tasks did without those commit 
messages. There may be cases where Spark can't guarantee whether a commit was 
complete or not, but where it can, it should pass those commit messages.

Say you had 100 successful write tasks of 400, but the executor died before 
it returned the 100th message (but after committing data) and that executor 
failure hit the max and Spark cancelled the job. Even though Spark has only 99 
commit messages, it should still pass the ones that it can. In the S3 case, 
these are the only way the driver kno

[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r141460252
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link DataWriter}.
+ *
+ * The writing procedure is:
+ *   1. Create a writer factory by {@link #createWriterFactory()}, 
serialize and send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create the data writer, and write the data of 
the partition with this
+ *  writer. If all the data are written successfully, call {@link 
DataWriter#commit()}. If
+ *  exception happens during the writing, call {@link 
DataWriter#abort()}. This step may repeat
+ *  several times as Spark will retry failed tasks.
+ *   3. If all writers are successfully committed, call {@link 
#commit(WriterCommitMessage[])}. If
+ *  some writers are aborted, or the job failed with an unknown 
reason, call {@link #abort()}.
+ *
+ * Spark may launch speculative tasks in step 2, so there may be more than 
one data writer working
+ * simultaneously for the same partition. Implementations should handle 
this case correctly, e.g.,
+ * make sure only one data writer can commit successfully, or only admit 
one committed data writer
+ * and ignore/revert others at job level.
+ *
+ * Note that, data sources are responsible for providing transaction 
ability by implementing the
+ * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link 
DataWriter} correctly.
+ * The transaction here is Spark-level transaction, which may not be the 
underlying storage
+ * transaction. For example, Spark successfully write data to a Cassandra 
data source, but
+ * Cassandra may need some more time to reach consistency at storage level.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceV2Writer {
+
+  /**
+   * Creates a writer factory which will be serialized and sent to 
executors.
+   */
+  DataWriteFactory createWriterFactory();
+
+  /**
+   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
+   * all data writers and are produced by {@link DataWriter#commit()}.
+   *
+   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
+   * Spark will pick the first successful one and get its commit message. 
Implementations should be
+   * aware of this and handle it correctly, e.g., have a mechanism to make 
sure only one data writer
+   * can commit successfully, or have a way to clean up the data of 
already committed writers.
+   */
+  void commit(WriterCommitMessage[] messages);
+
+  /**
+   * Aborts this writing job because some data writers are failed to write 
the records and aborted,
+   * or the Spark job failed with some unknown reasons.
+   *
+   * Note that some data writer may already be committed in this case, 
implementations should be
+   * aware of this and clean up the data.
+   */
+  void abort();
--- End diff --

This still needs to be passed the WriterCommitMessages for committed tasks. 
(My previous

[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-21 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r140389681
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create a data writer with the write task, and 
write the data of the
--- End diff --

Maybe you're right. If a source supports rollback for sequential tasks, 
then that might mean it has to support rollback for concurrent tasks. I was 
originally thinking of a case like JDBC without transactions. So an insert 
actually creates the rows and rolling back concurrent tasks would delete rows 
from the other task. But in that case, inserts are idempotent so it wouldn't 
matter. I'm not sure if there's a case where you can (or would) implement 
rolling back, but can't handle concurrency. Lets just leave it until someone 
has a use case for it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-21 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r140371372
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create a data writer with the write task, and 
write the data of the
--- End diff --

Maybe Spark should be able to disable speculative tasks at runtime instead 
of requiring data sources to rollback. I'd prefer that, but I don't think it is 
something that needs to change now. We can always add an interface if/when it 
is supported that allows data sources to communicate a lack of support for 
speculation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-20 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r140018805
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java 
---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * A data writer returned by {@link WriteTask#createWriter(int, int, int)} 
and is responsible for
+ * writing data for an input RDD partition.
+ *
+ * Note that, Currently the type `T` can only be {@link 
org.apache.spark.sql.Row} for normal data
+ * source writers, or {@link 
org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source
+ * writers that mix in {@link SupportsWriteUnsafeRow}.
+ */
+@InterfaceStability.Evolving
+public interface DataWriter {
+
+  void write(T record);
--- End diff --

I thought that might be the case. :)

But, we can't count on not throwing exceptions when writing so it would be 
good to also document what happens when a writer does throw an exception here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-20 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r140017014
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Command.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, 
RowEncoder}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+
+case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: 
LogicalPlan)
+  extends RunnableCommand {
--- End diff --

It isn't just the UI though. I've seen cases where 
InsertIntoHadoopFsRelationCommand is run inside a 
CreateDatasourceTableAsSelectCommand, and their enforcement of write modes 
depend on one another. Our internal S3 committer updates partition information 
and handles partition-level conflicts, but that requires that the table exists 
before the write (to check what partitions already exist). When we moved table 
creation in the CTAS command, it broke the insert into command, when these two 
should be separate.

While it's convenient to have a logical plan and a physical plan together, 
I think this ends up getting misused. That's why I'm advocating to change how 
we use RunnableCommand. To fix it, we should introduce a node with a command 
and logical plan, so we can optimize the entire plan and run the command at the 
right time.

Clearly, this isn't a blocker for this PR, I just want to mention that I 
see this pattern causing a lot of problems every time we pull in a new Spark 
version.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139841435
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Command.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, 
RowEncoder}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+
+case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: 
LogicalPlan)
+  extends RunnableCommand {
--- End diff --

I know similar tasks do the same, but this should not implement 
`RunnableCommand`. I'm not sure the original intent for it, but I think 
`RunnableCommand` should be used for small tasks that are carried out on the 
driver, like DDL. 

Using `RunnableCommand` in cases like this where a job needs to run ends up 
effectively linking a logical plan into a physical plan, which has caused a few 
messy issues. For example, the problem where the Spark SQL tab doesn't show the 
entire operation and only shows the outer command without metrics.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139839037
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java 
---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * A data writer returned by {@link WriteTask#createWriter(int, int, int)} 
and is responsible for
+ * writing data for an input RDD partition.
+ *
+ * Note that, Currently the type `T` can only be {@link 
org.apache.spark.sql.Row} for normal data
+ * source writers, or {@link 
org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source
+ * writers that mix in {@link SupportsWriteUnsafeRow}.
+ */
+@InterfaceStability.Evolving
+public interface DataWriter {
+
+  void write(T record);
--- End diff --

What happens if this throws an exception?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139838908
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create a data writer with the write task, and 
write the data of the
+ *  partition with this writer. If all the data are written 
successfully, call
+ *  {@link DataWriter#commit()}. If exception happens during the 
writing, call
+ *  {@link DataWriter#abort()}. This step may repeat several times as 
Spark will retry failed
+ *  tasks.
+ *   3. Wait until all the writers/partitions are finished, i.e., either 
committed or aborted. If
+ *  all partitions are written successfully, call {@link 
#commit(WriterCommitMessage[])}. If
+ *  some partitions failed and aborted, call {@link #abort()}.
+ *
+ * Note that, data sources are responsible for providing transaction 
ability by implementing the
+ * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link 
DataWriter} correctly.
+ * The transaction here is Spark-level transaction, which may not be the 
underlying storage
+ * transaction. For example, Spark successfully write data to a Cassandra 
data source, but
+ * Cassandra may need some more time to reach consistency at storage level.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceV2Writer {
+
+  /**
+   * Creates a write task which will be serialized and sent to executors. 
For each partition of the
+   * input data(RDD), there will be one write task to write the records.
+   */
+  WriteTask createWriteTask();
+
+  /**
+   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
+   * all data writers for this writing job and are produced by {@link 
DataWriter#commit()}. This
+   * also means all the data are written successfully and all data writers 
are committed.
+   */
+  void commit(WriterCommitMessage[] messages);
+
+  /**
+   * Aborts this writing job because some data writers are failed to write 
the records and aborted.
+   */
+  void abort();
--- End diff --

Should this accept the commit messages for committed tasks, or will tasks 
be aborted?

I'm thinking of the case where you're writing to S3. Say a data source 
writes all attempt files to the final locations, then removes any attempts that 
are aborted. If the job aborts with some tasks that have already committed, 
then either this should have the option of cleaning up those files (passed in 
the commit message) or all of the tasks should be individually aborted. I'd 
prefer to have this abort clean up successful/committed tasks because the logic 
may be different.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139838459
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create a data writer with the write task, and 
write the data of the
+ *  partition with this writer. If all the data are written 
successfully, call
+ *  {@link DataWriter#commit()}. If exception happens during the 
writing, call
+ *  {@link DataWriter#abort()}. This step may repeat several times as 
Spark will retry failed
+ *  tasks.
+ *   3. Wait until all the writers/partitions are finished, i.e., either 
committed or aborted. If
+ *  all partitions are written successfully, call {@link 
#commit(WriterCommitMessage[])}. If
+ *  some partitions failed and aborted, call {@link #abort()}.
+ *
+ * Note that, data sources are responsible for providing transaction 
ability by implementing the
+ * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link 
DataWriter} correctly.
+ * The transaction here is Spark-level transaction, which may not be the 
underlying storage
+ * transaction. For example, Spark successfully write data to a Cassandra 
data source, but
+ * Cassandra may need some more time to reach consistency at storage level.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceV2Writer {
+
+  /**
+   * Creates a write task which will be serialized and sent to executors. 
For each partition of the
+   * input data(RDD), there will be one write task to write the records.
+   */
+  WriteTask createWriteTask();
+
+  /**
+   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
+   * all data writers for this writing job and are produced by {@link 
DataWriter#commit()}. This
+   * also means all the data are written successfully and all data writers 
are committed.
--- End diff --

I think this should state the guarantees when this method is called:

* One and only one attempt for every task has committed successfully
* Messages contains the commit message from every committed task attempt, 
which is no more than one per task.
* All other attempts have been successfully aborted (is this a guarantee, 
or just that aborts have been attemtped?)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139836068
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create a data writer with the write task, and 
write the data of the
--- End diff --

How does this handle speculative execution? This description makes it sound 
like attempts are only run serially. I'd like to have an interface that signals 
support for concurrent tasks, for data sources that act like the direct 
committer and can't handle speculation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139835603
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create a data writer with the write task, and 
write the data of the
+ *  partition with this writer. If all the data are written 
successfully, call
+ *  {@link DataWriter#commit()}. If exception happens during the 
writing, call
+ *  {@link DataWriter#abort()}. This step may repeat several times as 
Spark will retry failed
+ *  tasks.
+ *   3. Wait until all the writers/partitions are finished, i.e., either 
committed or aborted. If
+ *  all partitions are written successfully, call {@link 
#commit(WriterCommitMessage[])}. If
+ *  some partitions failed and aborted, call {@link #abort()}.
+ *
+ * Note that, data sources are responsible for providing transaction 
ability by implementing the
+ * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link 
DataWriter} correctly.
+ * The transaction here is Spark-level transaction, which may not be the 
underlying storage
+ * transaction. For example, Spark successfully write data to a Cassandra 
data source, but
+ * Cassandra may need some more time to reach consistency at storage level.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceV2Writer {
+
+  /**
+   * Creates a write task which will be serialized and sent to executors. 
For each partition of the
+   * input data(RDD), there will be one write task to write the records.
+   */
+  WriteTask createWriteTask();
--- End diff --

I think it's confusing to have only one write "task" that is serialized and 
used everywhere. It is implicitly copied by the serialization into multiple 
distinct tasks. Is there a better name for it? Maybe call the `DataWriter` the 
`WriteTask` and serialize something with a better name?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139834571
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java ---
@@ -30,9 +30,8 @@
   /**
* Creates a {@link DataSourceV2Reader} to scan the data from this data 
source.
*
-   * @param options the options for this data source reader, which is an 
immutable case-insensitive
-   *string-to-string map.
-   * @return a reader that implements the actual read logic.
+   * @param options the options for the returned data source reader, which 
is an immutable
+   *case-insensitive string-to-string map.
--- End diff --

It would make this much easier to review if changes to the read path were 
taken out and committed in a follow-up to #19136.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139832973
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create a data writer with the write task, and 
write the data of the
+ *  partition with this writer. If all the data are written 
successfully, call
+ *  {@link DataWriter#commit()}. If exception happens during the 
writing, call
+ *  {@link DataWriter#abort()}. This step may repeat several times as 
Spark will retry failed
+ *  tasks.
+ *   3. Wait until all the writers/partitions are finished, i.e., either 
committed or aborted. If
+ *  all partitions are written successfully, call {@link 
#commit(WriterCommitMessage[])}. If
+ *  some partitions failed and aborted, call {@link #abort()}.
--- End diff --

The main reason why I wanted a separate SPIP for the write path was this 
point in the doc:

> Ideally partitioning/bucketing concept should not be exposed in the Data 
Source API V2, because they are just techniques for data skipping and 
pre-partitioning. However, these 2 concepts are already widely used in Spark, 
e.g. DataFrameWriter.partitionBy and DDL syntax like ADD PARTITION. To be 
consistent, we need to add partitioning/bucketing to Data Source V2, so that 
the implementations can be able to specify partitioning/bucketing for 
read/write.

There's a lot in there that's worth thinking about and possibly changing:

1. Ideally, the DataSourceV2 API wouldn't support bucketing/partitioning
2. The current DataFrameWriter API is what we should continue to support
3. Implementations should supply bucketing and partitioning for writes 
because of 2

**Bucketing/partitioning**: It comes down to the level at which this API is 
going to be used. It looks like this API currently ignores bucketing and 
partitioning (unless my read through was too quick). I think I agree that in 
the long term that's a good thing, but we need ways for a data source to tell 
Spark about its requirements for incoming data.

In the current version, it looks like Spark would know how to prepare data 
for writers outside of this API (rather than including support as suggested by 
point 3). When writing a partitioned table, Spark would get the partitioning 
from the table definition in the metastore and automatically sort by partition 
columns. Is that right?

I'd like to move the data store's requirements behind this API. For 
example, writing to HBase files directly requires sorting by key first. We 
don't want to do the sort in the writer because it may duplicate work (and 
isn't captured in the physical plan), and we also don't want to require Spark 
to know about the requirements of the HBase data store, or any other specific 
implementation.

**DataFrameWriter API**: I'd like to talk about separating the API for 
table definitions and writes, but not necessarily as

[GitHub] spark pull request #19136: [SPARK-15689][SQL] data source v2 read path

2017-09-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r138681562
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java 
---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.Closeable;
+
+/**
+ * A data reader returned by a read task and is responsible for outputting 
data for a RDD partition.
+ */
+public interface DataReader extends Closeable {
--- End diff --

The initialization is done when creating this `DataReader` from a 
`ReadTask`. That ensures that the initialization happens (easy to forget 
`open()`) and simplifies the checks that need to be done because `DataReader` 
can't exist otherwise.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-11 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r138123207
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.Filter
+import 
org.apache.spark.sql.sources.v2.reader.downward.{CatalystFilterPushDownSupport, 
ColumnPruningSupport, FilterPushDownSupport}
+
+object DataSourceV2Strategy extends Strategy {
+  // TODO: write path
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+case PhysicalOperation(projects, filters, DataSourceV2Relation(output, 
reader)) =>
+  val attrMap = AttributeMap(output.zip(output))
+
+  val projectSet = AttributeSet(projects.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  // Match original case of attributes.
+  // TODO: nested fields pruning
+  val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap)
--- End diff --

It seems reasonable to only request the ones that will be used, or that 
have residuals after pushing filters.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137829674
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader
+import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport
+
+case class DataSourceV2Relation(
+output: Seq[AttributeReference],
+reader: DataSourceV2Reader) extends LeafNode {
+
+  override def computeStats(): Statistics = reader match {
+case r: StatisticsSupport => Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes())
+case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes)
+  }
+}
+
+object DataSourceV2Relation {
+  def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
+new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
--- End diff --

I think we should add a way to provide partition values outside of the 
columnar reader. It wouldn't be too difficult to add a method on `ReadTask` 
that returns them, then create a joined row in the scan exec. Otherwise, this 
requires a lot of wasted memory for a scan.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137597910
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader
+import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport
+
+case class DataSourceV2Relation(
+output: Seq[AttributeReference],
+reader: DataSourceV2Reader) extends LeafNode {
+
+  override def computeStats(): Statistics = reader match {
+case r: StatisticsSupport => Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes())
+case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes)
+  }
+}
+
+object DataSourceV2Relation {
+  def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
+new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
--- End diff --

Are you saying that partition pruning isn't delegated to the data source in 
this interface?

I was just looking into how the data source should provide partition data, 
or at least fields that are the same for all rows in a `ReadTask`. It would be 
nice to have a way to pass those up instead of materializing them in each 
`UnsafeRow`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137591425
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader
+import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport
+
+case class DataSourceV2Relation(
+output: Seq[AttributeReference],
+reader: DataSourceV2Reader) extends LeafNode {
+
+  override def computeStats(): Statistics = reader match {
+case r: StatisticsSupport => Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes())
+case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes)
+  }
+}
+
+object DataSourceV2Relation {
+  def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
+new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
--- End diff --

Is this the right schema? The docs for `readSchema` say it is the result of 
pushdown and projection, which doesn't seem appropriate for a `Relation`. Does 
relation represent a table that can be filtered and projected, or does it 
represent a single read? At least in the Hive read path, it's a table.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r137587367
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/SchemaRequiredDataSourceV2.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A variant of `DataSourceV2` which requires users to provide a schema 
when reading data. A data
+ * source can inherit both `DataSourceV2` and `SchemaRequiredDataSourceV2` 
if it supports both schema
+ * inference and user-specified schemas.
+ */
+public interface SchemaRequiredDataSourceV2 {
+
+  /**
+   * Create a `DataSourceV2Reader` to scan the data for this data source.
+   *
+   * @param schema the full schema of this data source reader. Full schema 
usually maps to the
+   *   physical schema of the underlying storage of this data 
source reader, e.g.
+   *   parquet files, JDBC tables, etc, while this reader may 
not read data with full
--- End diff --

Maybe update the doc here, since JDBC sources and Parquet files probably 
shouldn't implement this. CSV and JSON are the examples that come to mind for 
sources that require a schema.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19136: [DO NOT MERGE][SPARK-15689][SQL] data source v2

2017-09-06 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/19136
  
Thanks for pinging me. I left comments on the older PR, since other 
discussion was already there. If you'd prefer comments here, just let me know.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] incubator-toree pull request #128: [TOREE-425] Force sparkContext initializa...

2017-07-17 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/128#discussion_r127859204
  
--- Diff: 
kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala 
---
@@ -82,6 +84,8 @@ trait StandardComponentInitialization extends 
ComponentInitialization {
 
 initializePlugins(config, pluginManager)
 
+initializeSparkContext(config, kernel)
--- End diff --

Should this be here, or in the kernel's initialization? I think we 
eventually want Toree to not require Spark, in which case we would have a 
SparkKernel and a regular Kernel. Then it would be the responsibility of the 
SparkKernel to detect that the application is in cluster mode and initialize. 
Doing this sooner rather than later would avoid more changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #128: [TOREE-425] Force sparkContext initializa...

2017-07-17 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/128#discussion_r127858852
  
--- Diff: 
kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala 
---
@@ -94,6 +98,18 @@ trait StandardComponentInitialization extends 
ComponentInitialization {
 
   }
 
+  def initializeSparkContext(config:Config, kernel:Kernel) = {
+// TOREE:425 Spark cluster mode requires a context to be initialized 
before
+// it register the application as Running
+if ( SparkUtils.isSparkClusterMode(kernel.sparkConf) ) {
--- End diff --

Nit: Are the spaces needed? I think this is non-standard for Scala or Java 
projects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #128: [TOREE-425] Force sparkContext initializa...

2017-07-17 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/128#discussion_r127858771
  
--- Diff: kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala ---
@@ -20,8 +20,11 @@ package org.apache.toree.kernel.api
 import java.io.{InputStream, PrintStream}
 import java.net.URI
 import java.util.concurrent.{ConcurrentHashMap, TimeUnit, TimeoutException}
+
--- End diff --

Nit: unnecessary changes in imports make backports and branch maintenance 
harder. I'd prefer not to have these changes unless there is a stated style 
that this changes conforms to.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #128: [TOREE-425] Force sparkContext initializa...

2017-07-17 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/128#discussion_r127858615
  
--- Diff: kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala ---
@@ -414,13 +417,15 @@ class Kernel (
   Await.result(sessionFuture, Duration(100, TimeUnit.MILLISECONDS))
 } catch {
   case timeout: TimeoutException =>
-// getting the session is taking a long time, so assume that 
Spark
-// is starting and print a message
-display.content(
-  MIMEType.PlainText, "Waiting for a Spark session to 
start...")
+// in cluster mode, the sparkContext is forced to initialize
+if (SparkUtils.isSparkClusterMode(defaultSparkConf) == false) {
--- End diff --

Instead of preventing the message from being sent, I think this should 
update the logic so that this uses the second case. That case just creates a 
new context without the futures or Await calls.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark issue #18450: [SPARK-21238][SQL] allow nested SQL execution

2017-06-28 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/18450
  
I think it is good that this would no longer throw exceptions at runtime. 
Is the purpose of not allowing nested executions to minimize the queries shown 
in the UI? If that's the only purpose then I agree that just eliminating the 
empty ones is a good strategy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18419: [SPARK-20213][SQL][follow-up] introduce SQLExecut...

2017-06-27 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/18419#discussion_r124331858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala ---
@@ -100,7 +105,9 @@ object SQLExecution {
   // all accumulator metrics will be 0. It will confuse people if we 
show them in Web UI.
   //
   // A real case is the `DataFrame.count` method.
-  throw new IllegalArgumentException(s"$EXECUTION_ID_KEY is already 
set")
+  throw new IllegalArgumentException(s"$EXECUTION_ID_KEY is already 
set, please wrap your " +
--- End diff --

The problem is that this is an easy error to hit and it shouldn't affect 
end users. It is better to warn that something is wrong than to fail a job that 
would otherwise succeed for a bug in Spark. As for the error message, I think 
it is fine if we intend to leave it in. I'd just rather not fail user jobs here.

I assume that DataSource developers will have tests, but probably not ones 
that know to set spark.testing. Is there a better way to detect test cases?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18419: [SPARK-20213][SQL][follow-up] introduce SQLExecution.ign...

2017-06-26 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/18419
  
One minor comment, otherwise +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18419: [SPARK-20213][SQL][follow-up] introduce SQLExecut...

2017-06-26 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/18419#discussion_r124058801
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala ---
@@ -100,7 +105,9 @@ object SQLExecution {
   // all accumulator metrics will be 0. It will confuse people if we 
show them in Web UI.
   //
   // A real case is the `DataFrame.count` method.
-  throw new IllegalArgumentException(s"$EXECUTION_ID_KEY is already 
set")
+  throw new IllegalArgumentException(s"$EXECUTION_ID_KEY is already 
set, please wrap your " +
--- End diff --

Nested execution is a developer problem, not a user problem. That's why the 
[original 
PR](https://github.com/apache/spark/pull/17540/files#diff-ab49028253e599e6e74cc4f4dcb2e3a8R127)
 did not throw `IllegalArgumentException` outside of testing. I think that 
should still be how this is handled.

If this is thrown at runtime, adding the text about 
`ignoreNestedExecutionId` is confusing for users, who can't (or shouldn't) set 
it. A comment is more appropriate if users will see this message. If the change 
to only throw during testing is added, then I think it is fine to add the text 
to the exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] incubator-toree pull request #123: TOREE-415: Cancel Spark jobs when interru...

2017-06-19 Thread rdblue
Github user rdblue closed the pull request at:

https://github.com/apache/incubator-toree/pull/123


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #123: TOREE-415: Cancel Spark jobs when interru...

2017-06-19 Thread rdblue
GitHub user rdblue reopened a pull request:

https://github.com/apache/incubator-toree/pull/123

TOREE-415: Cancel Spark jobs when interrupted.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/incubator-toree 
TOREE-415-interrupt-spark-jobs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-toree/pull/123.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #123






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...

2017-06-15 Thread rdblue
Github user rdblue closed the pull request at:

https://github.com/apache/incubator-toree/pull/104


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...

2017-06-14 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/104#discussion_r122049346
  
--- Diff: 
kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTransformer.scala
 ---
@@ -43,7 +43,7 @@ class BrokerTransformer {
 import scala.concurrent.ExecutionContext.Implicits.global
 
 futureResult
-  .map(results => (Results.Success, Left(results)))
+  .map(results => (Results.Success, Left(Map("text/plain" -> 
results
--- End diff --

Opened https://issues.apache.org/jira/browse/TOREE-418


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...

2017-06-14 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/104#discussion_r122048356
  
--- Diff: 
kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/relay/ExecuteRequestRelaySpec.scala
 ---
@@ -88,7 +89,7 @@ class ExecuteRequestRelaySpec extends TestKit(
 // Expected does not actually match real return of magic, which
 // is a tuple of ExecuteReply and ExecuteResult
 val expected = new ExecuteAborted()
-interpreterActorProbe.expectMsgClass(
+interpreterActorProbe.expectMsgClass(max = Duration(5, 
TimeUnit.SECONDS),
--- End diff --

Opened https://issues.apache.org/jira/browse/TOREE-417


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...

2017-06-14 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/104#discussion_r122047867
  
--- Diff: 
kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/stream/KernelInputStreamSpec.scala
 ---
@@ -65,6 +65,9 @@ class KernelInputStreamSpec
 // set of data
 
doReturn(system.actorSelection(fakeInputOutputHandlerActor.path.toString))
   .when(mockActorLoader).load(MessageType.Incoming.InputReply)
+// Allow time for the actors to start. This avoids read() hanging 
forever
+// when running tests in gradle.
+Thread.sleep(100)
--- End diff --

Opened https://issues.apache.org/jira/browse/TOREE-416


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...

2017-06-14 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/104#discussion_r122046211
  
--- Diff: 
kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerTransformer.scala
 ---
@@ -43,7 +43,7 @@ class BrokerTransformer {
 import scala.concurrent.ExecutionContext.Implicits.global
 
 futureResult
-  .map(results => (Results.Success, Left(results)))
+  .map(results => (Results.Success, Left(Map("text/plain" -> 
results
--- End diff --

No, kernel-api doesn't depend on protocol so it isn't available. But I 
think that this code shouldn't be sending mime types in the first place. 
Sending text/plain here happens because we haven't updated the Python kernel to 
correctly inspect its objects.

Eventually, the python interpreter should send back a result that is a py4j 
reference to the final value and the python representation (from _repr_ and 
_repr_html_) or a JVM object and None to indicate that the Jupyter JVM 
displayer should be called. So for now, I'd rather not change module 
dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...

2017-06-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/104#discussion_r121765250
  
--- Diff: scala-interpreter/build.sbt ---
@@ -18,3 +18,4 @@ import sbt.Tests.{Group, SubProcess}
  */
 
 libraryDependencies ++= Dependencies.sparkAll.value
+libraryDependencies += "com.github.jupyter" % "jvm-repr" % "0.1.0"
--- End diff --

Yes, we probably should.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...

2017-06-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/104#discussion_r121765086
  
--- Diff: 
scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaDisplayers.scala
 ---
@@ -0,0 +1,207 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+package org.apache.toree.kernel.interpreter.scala
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.Try
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SparkSession
+import jupyter.Displayer
+import jupyter.Displayers
+import jupyter.MIMETypes
+import org.apache.toree.kernel.protocol.v5.MIMEType
+import org.apache.toree.magic.MagicOutput
+
+object ScalaDisplayers {
+
+  def ensureLoaded(): Unit = ()
--- End diff --

Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...

2017-06-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/104#discussion_r121757635
  
--- Diff: 
scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala
 ---
@@ -18,30 +18,34 @@
 package org.apache.toree.kernel.interpreter.scala
 
 import java.io.ByteArrayOutputStream
-import java.net.{URL, URLClassLoader}
-import java.nio.charset.Charset
 import java.util.concurrent.ExecutionException
-
 import com.typesafe.config.{Config, ConfigFactory}
+import jupyter.Displayers
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.repl.Main
-
 import org.apache.toree.interpreter._
-import org.apache.toree.kernel.api.{KernelLike, KernelOptions}
+import org.apache.toree.kernel.api.KernelLike
 import org.apache.toree.utils.TaskManager
 import org.slf4j.LoggerFactory
 import org.apache.toree.kernel.BuildInfo
-
+import org.apache.toree.kernel.protocol.v5.MIMEType
 import scala.annotation.tailrec
+import scala.collection.JavaConverters._
 import scala.concurrent.{Await, Future}
 import scala.language.reflectiveCalls
 import scala.tools.nsc.Settings
 import scala.tools.nsc.interpreter.{IR, OutputStream}
 import scala.tools.nsc.util.ClassPath
-import scala.util.{Try => UtilTry}
+import scala.util.matching.Regex
 
 class ScalaInterpreter(private val config:Config = ConfigFactory.load) 
extends Interpreter with ScalaInterpreterSpecific {
+  import ScalaInterpreter._
+
+  ScalaDisplayers.ensureLoaded()
--- End diff --

This makes sure that the registration in `ScalaDisplayers` happens when 
that class loads.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...

2017-06-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/104#discussion_r121757522
  
--- Diff: 
scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaDisplayers.scala
 ---
@@ -0,0 +1,207 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+package org.apache.toree.kernel.interpreter.scala
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.Try
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SparkSession
+import jupyter.Displayer
+import jupyter.Displayers
+import jupyter.MIMETypes
+import org.apache.toree.kernel.protocol.v5.MIMEType
+import org.apache.toree.magic.MagicOutput
+
+object ScalaDisplayers {
+
+  def ensureLoaded(): Unit = ()
--- End diff --

Calling this ensures the `ScalaDisplayers` object has been loaded, which 
performs the registration. Without calling this to make sure it loads, the 
registration further down never happens and we don't get representations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...

2017-06-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/104#discussion_r121757204
  
--- Diff: scala-interpreter/build.sbt ---
@@ -18,3 +18,4 @@ import sbt.Tests.{Group, SubProcess}
  */
 
 libraryDependencies ++= Dependencies.sparkAll.value
+libraryDependencies += "com.github.jupyter" % "jvm-repr" % "0.1.0"
--- End diff --

I think we'll need to take a look after this PR at the other interpreters. 
PySpark, for example, should adhere to the common python convention to use 
_repr_html_ for objects that exist in the python process, and should use the 
Jupyter library for objects that are resident in the JVM (like DataFrames).

The background on Jupyter's jvm-repr library is that I'm trying to work 
with the Jupyter community to standardize a system for libraries and kernels to 
collaborate on rich representations for the JVM. Vegas, for example, should 
register a function to display its objects so it doesn't have to be built into 
the kernel or bolted on by users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...

2017-06-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/104#discussion_r121756270
  
--- Diff: 
kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/relay/ExecuteRequestRelaySpec.scala
 ---
@@ -88,7 +89,7 @@ class ExecuteRequestRelaySpec extends TestKit(
 // Expected does not actually match real return of magic, which
 // is a tuple of ExecuteReply and ExecuteResult
 val expected = new ExecuteAborted()
-interpreterActorProbe.expectMsgClass(
+interpreterActorProbe.expectMsgClass(max = Duration(5, 
TimeUnit.SECONDS),
--- End diff --

The default is 3 seconds, but it was flaky for me. This was another 
work-around that probably shouldn't be here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...

2017-06-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/incubator-toree/pull/104#discussion_r121755733
  
--- Diff: 
kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/stream/KernelInputStreamSpec.scala
 ---
@@ -65,6 +65,9 @@ class KernelInputStreamSpec
 // set of data
 
doReturn(system.actorSelection(fakeInputOutputHandlerActor.path.toString))
   .when(mockActorLoader).load(MessageType.Incoming.InputReply)
+// Allow time for the actors to start. This avoids read() hanging 
forever
+// when running tests in gradle.
+Thread.sleep(100)
--- End diff --

I do, too. The problem is that this test is flaky on my machine without it. 
We need to take a closer look at the flaky test, but I don't think this PR is 
the place to fix it. How about I open an issue for it and we solve the problem 
separately?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #124: TOREE-407: Add support for hdfs and s3 to...

2017-06-10 Thread rdblue
Github user rdblue closed the pull request at:

https://github.com/apache/incubator-toree/pull/124


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #125: TOREE-408: Add support for hdfs and s3 to...

2017-06-10 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/incubator-toree/pull/125

TOREE-408: Add support for hdfs and s3 to AddJar.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/incubator-toree 
TOREE-408-add-jar-support-hadoop-file-systems

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-toree/pull/125.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #125


commit b1b436f986b87764eb6db3ca528585e9d1147229
Author: Ryan Blue 
Date:   2017-03-13T16:17:50Z

TOREE-408: Add support for hdfs and s3 to AddJar.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #124: TOREE-407: Add support for hdfs and s3 to...

2017-06-10 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/incubator-toree/pull/124

TOREE-407: Add support for hdfs and s3 to AddJar.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/incubator-toree 
TOREE-407-add-jar-support-hadoop-file-systems

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-toree/pull/124.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #124


commit 98c33cf740d4562075ecca603eca90d0ff4c4d8f
Author: Ryan Blue 
Date:   2017-03-13T16:17:50Z

TOREE-407: Add support for hdfs and s3 to AddJar.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #123: TOREE-415: Cancel Spark jobs when interru...

2017-06-10 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/incubator-toree/pull/123

TOREE-415: Cancel Spark jobs when interrupted.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/incubator-toree 
TOREE-415-interrupt-spark-jobs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-toree/pull/123.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #123


commit da162269c13138f6560bb0d9dab7a6c61b311e26
Author: Ryan Blue 
Date:   2017-06-02T18:37:31Z

TOREE-415: Cancel Spark jobs when interrupted.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...

2017-06-10 Thread rdblue
GitHub user rdblue reopened a pull request:

https://github.com/apache/incubator-toree/pull/104

TOREE-380: Allow interpreters to format output.

This branch has the changes that I made to our Toree distribution to allow 
interpreters to format output. The goal is to enable better integration when 
libraries are used with Toree, similar to conventions in IPython like 
`_repr_html_` methods.

In IPython, classes can define a `_repr_html_` method that is used to 
produce a `text/html` representation of an output object that is shown in the 
Jupyter notebook. This branch adds a `Displayer` class that can be registered 
with the Scala interpreter and updates the interpreter so that it fetches the 
last object of an interpreted block and calls an appropriate displayer to get 
its output representations. I think this is a reasonable way to maintain type 
safety and allow libraries to produce HTML outputs in notebooks. I'm also 
interested in using the plugin system to register these.

In addition, there are a few example displayer implementations:
* A SparkSession/SparkContext displayer that produces HTML links to the a 
Spark application
* A [Vegas](https://github.com/vegas-viz/Vegas) displayer for graphs
* A MagicOutput displayer to avoid the need for post-processing
* A default displayer for AnyRef that calls toString and (as 
proof-of-concept) uses reflection to call toHtml

Implementation required changing the way outputs from interpreters and 
blocks are handled. Rather than an interpreter returning text, it now returns a 
mime-type to content map (which has various type names). It also changes magic 
to produce MagicOutput, instead of requiring post-processing.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/incubator-toree 
TOREE-380-add-result-content-map

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-toree/pull/104.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #104


commit 54340466c917d4ac10412305dee4773d09a4b015
Author: Ryan Blue 
Date:   2017-02-06T02:18:36Z

TOREE-380: Allow interpreters to produce output by MIME type.

This updates the interpreter API to return a Map of MIME type to content
instead of a String that is rendered as text. This allows interpreters
to show HTML as cell output.

This also updates the output of magic functions to be a similar MIME
type to content structure. This is cleaner and no longer requires
hacky post-processing before relaying a cell's output.

commit a78e9cb9b9c2f755c3b393a8144fe647fdb716bb
Author: Ryan Blue 
Date:   2017-04-10T01:42:17Z

TOREE-380: Add support for Jupyter's jvm-repr API.

commit fc9889015fa99f31968947ae71e2225a8e3fb504
Author: Ryan Blue 
Date:   2017-04-30T22:52:25Z

TOREE-380: Fix tests.

commit 9c9c9dc045cfddc3130ee596afba309dfbd540c4
Author: Ryan Blue 
Date:   2017-06-09T01:19:34Z

TOREE-380: Add JVMReprSpec.

commit b4c1510644cf8d33410ad3d71f78c522d27b5bbe
Author: Ryan Blue 
Date:   2017-06-09T16:06:02Z

TOREE-380: Fix Python integration tests.

commit 7a50f20abe9c9a85b57e39fe3124fffb6f55d797
Author: Ryan Blue 
Date:   2017-06-09T16:34:01Z

TOREE-380: Fix AddJar integration test.

commit 1eaf4a0ae605e865deadad60e13328a1c2197e98
Author: Ryan Blue 
Date:   2017-06-10T18:50:10Z

TOREE-380: Add copyright header to new files.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #104: TOREE-380: Allow interpreters to format o...

2017-06-10 Thread rdblue
Github user rdblue closed the pull request at:

https://github.com/apache/incubator-toree/pull/104


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #122: TOREE-414: Use ThreadFactoryBuilder to se...

2017-06-10 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/incubator-toree/pull/122

TOREE-414: Use ThreadFactoryBuilder to set name and daemon status.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/incubator-toree 
TOREE-414-fix-thread-creation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-toree/pull/122.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #122


commit 904f7bc5d4e153565451283180c33a0952c9f908
Author: Ryan Blue 
Date:   2017-04-27T21:05:27Z

TOREE-414: Use ThreadFactoryBuilder to set name and daemon status.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #121: TOREE-413: Add exit commands and kernel i...

2017-06-10 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/incubator-toree/pull/121

TOREE-413: Add exit commands and kernel idle timeout.

This adds support for "exit", "quit", and ":q" to exit Toree. It also adds 
a session idle timeout that defaults to 24h and a command-line option to 
control it.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/incubator-toree 
TOREE-413-add-exit-and-idle-timeout

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-toree/pull/121.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #121


commit c3e70ac835082a990353dfe584d176737147b4c8
Author: Ryan Blue 
Date:   2017-04-25T23:17:41Z

TOREE-413: Add exit command and idle timeout.

commit 6d42083e5aacad6a3ff881766fe1fb4f05480ff2
Author: Ryan Blue 
Date:   2017-04-26T19:44:46Z

TOREE-413: Add --idle-timeout and default config.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-toree pull request #120: TOREE-412: Update AddDeps magic

2017-06-10 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/incubator-toree/pull/120

TOREE-412: Update AddDeps magic

* Removes non-Jar files (POM files) from downloaded artifacts before adding 
dependencies to the kernel
* Updates to Coursier 1.0.0-M15-7
* Uses "default" configuration by default, instead of "compile" for Ivy 
dependencies
* Adds `--classifier` for maven artifacts
* Adds `--ivy-configuration` for Ivy artifacts

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/incubator-toree 
TOREE-412-update-add-deps

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-toree/pull/120.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #120


commit 6289e78064c788e3d6c56397abd7278fd067128d
Author: Ryan Blue 
Date:   2017-04-18T21:32:12Z

TOREE-412: Fix AddDeps by removing POM files.

commit 642b2d1463d7d48ae48dfafe262957f071fe63ef
Author: Ryan Blue 
Date:   2017-04-19T20:03:09Z

TOREE-412: Update Coursier to 1.0.0-M15-7.

commit 039b549b6a0af30266f2be429629e7d79dba4fbb
Author: Ryan Blue 
Date:   2017-04-19T21:55:56Z

TOREE-412: Set AddDeps configuration to default.

commit a7d15e4bbcdb4c321d3ce3996487953375e8d254
Author: Ryan Blue 
Date:   2017-04-25T18:37:58Z

TOREE-412: Add --classifier to AddDeps.

commit 8642253b5b694901b9d470c02c210a579cc3827e
Author: Ryan Blue 
Date:   2017-04-25T21:19:48Z

TOREE-412: Add --ivy-configuration option to AddDeps.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark issue #18162: [SPARK-20923] turn tracking of TaskMetrics._updatedBlock...

2017-06-08 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/18162
  
@tgravescs, I deployed this to our production environment (based on 2.0.0) 
a few days ago and haven't hit any problems with it. I think this is good to 
go, unless something has been added recently that uses the block statuses.

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18064: [SPARK-20213][SQL] Fix DataFrameWriter operations in SQL...

2017-06-08 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/18064
  
Refactor?

I thought that was the problem with the [original 
PR](https://github.com/apache/spark/pull/17540); that PR was too narrow and 
didn't unify the physical plans to get all metrics working.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18181: [SPARK-20958][SQL] Roll back parquet-mr 1.8.2 to 1.8.1

2017-06-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/18181
  
It sounds like we should plan on a 1.8.3 and a 1.9.1 soon in the Parquet
community. I'll start this up.

On Fri, Jun 2, 2017 at 9:47 AM, Michael Allman 
wrote:

> I can't speak to Parquet 1.8.x anymore. We use Parquet 1.9.0 plus a patch
> for https://issues.apache.org/jira/browse/PARQUET-783 and have had no
> problems.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/18181#issuecomment-305847279>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAFXa0FiKB143yePbSsis2Dq-yGOwRLJks5sADy7gaJpZM4Nttfk>
> .
>



-- 
Ryan Blue



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18181: [SPARK-20958][SQL] Roll back parquet-mr 1.8.2 to 1.8.1

2017-06-02 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/18181
  
-1, with comments on the JIRA issue. I think it is better to include the 
Parquet fixes in 1.8.2 since Parquet doesn't pull in Avro 1.8.1 - that happens 
when users declare their own dependency on parquet-avro.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18064: [SPARK-20213][SQL] Fix DataFrameWriter operations in SQL...

2017-05-25 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/18064
  
@cloud-fan, can you summarize how this differs from the original PR #17540?

I have time to pick this up again, but I thought that the other PR only 
needed two changes:

* Merge your changes to unify the physical plans when using 
`ExecuteCommandExec`
* Move `withNewExecutionId` to stremaing batches, rather than the entire 
session

This seems to have gone though a lot of updates for testing, though, and I 
don't see my original commits... is this a reimplementation of those changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18064: [SPARK-20213][SQL] Fix DataFrameWriter operations in SQL...

2017-05-22 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/18064
  
Agreed, sorry I haven't updated it. I was out most of last week. I'll get 
this fixed up as soon as I can. Thanks for all your help!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17680: [SPARK-20364][SQL] Support Parquet predicate pushdown on...

2017-05-18 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/17680
  
There's an open PR ([#361](https://github.com/apache/parquet-mr/pull/361)), 
to support quoted column names, but the discussion on the merits of it is 
on-going. I don't see a huge benefit to supporting `.` in column names, and I'm 
concerned that it breaks readers like parquet-avro that specifically don't 
allow them. More discussion from the Spark community is welcome there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17680: [SPARK-20364][SQL] Support Parquet predicate pushdown on...

2017-05-18 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/17680
  
@gatorsmile, sorry for not responding, I was on vacation for a few days. 
Should I still review this even though it is merged?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #12313: [SPARK-14543] [SQL] Improve InsertIntoTable column resol...

2017-05-11 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/12313
  
We were trying to get this in just before the 2.0 release, which was a bad 
time. We've just been maintaining it in our version, but I'm going to be 
rebasing it on to 2.1 soon so I'll see what needs to be done to get it in 
upstream.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13206: [SPARK-15420] [SQL] Add repartition and sort to prepare ...

2017-05-11 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/13206
  
@HyukjinKwon, that addresses part of what this patch does, but only for 
writes that go through FileFormatWriter. This patch works for Hive and adds an 
optimizer rule to add the sort instead of sorting in the writer, which I don't 
think is a great idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    5   6   7   8   9   10   11   12   13   14   >